ACCUMULO-3231 Initial stub out of some scripts to automate use of the merkle tree + some random ingest
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/f6af58de Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/f6af58de Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/f6af58de Branch: refs/heads/master Commit: f6af58de5814aa03dddaff79614e10f245ea124b Parents: 5fdbb81 Author: Josh Elser <els...@apache.org> Authored: Mon Oct 13 23:55:10 2014 -0400 Committer: Josh Elser <els...@apache.org> Committed: Fri Nov 7 15:51:53 2014 -0500 ---------------------------------------------------------------------- .../test/replication/merkle/MerkleTree.java | 92 ++++++ .../test/replication/merkle/MerkleTreeNode.java | 131 +++++++++ .../replication/merkle/RangeSerialization.java | 72 +++++ .../replication/merkle/cli/CompareTables.java | 164 +++++++++++ .../replication/merkle/cli/ComputeRootHash.java | 100 +++++++ .../replication/merkle/cli/GenerateHashes.java | 285 +++++++++++++++++++ .../merkle/cli/ManualComparison.java | 98 +++++++ .../merkle/ingest/RandomWorkload.java | 120 ++++++++ .../test/replication/merkle/package-info.java | 38 +++ .../replication/merkle/skvi/DigestIterator.java | 152 ++++++++++ .../merkle-replication/configure-replication.sh | 99 +++++++ test/system/merkle-replication/ingest-data.sh | 39 +++ test/system/merkle-replication/merkle-env.sh | 48 ++++ test/system/merkle-replication/verify-data.sh | 91 ++++++ 14 files changed, 1529 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/f6af58de/test/src/main/java/org/apache/accumulo/test/replication/merkle/MerkleTree.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/replication/merkle/MerkleTree.java b/test/src/main/java/org/apache/accumulo/test/replication/merkle/MerkleTree.java new file mode 100644 index 0000000..03eb466 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/replication/merkle/MerkleTree.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.replication.merkle; + +import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.accumulo.core.util.Pair; + +import com.google.common.collect.Iterables; + +/** + * Simple implementation of a Merkle tree + */ +public class MerkleTree { + protected List<MerkleTreeNode> leaves; + protected String digestAlgorithm; + + public MerkleTree(List<MerkleTreeNode> leaves, String digestAlgorithm) { + this.leaves = leaves; + this.digestAlgorithm = digestAlgorithm; + } + + public MerkleTreeNode getRootNode() throws NoSuchAlgorithmException { + ArrayList<MerkleTreeNode> buffer = new ArrayList<>(leaves.size()); + buffer.addAll(leaves); + + while (buffer.size() > 1) { + // Find two nodes that we want to roll up + Pair<Integer,Integer> pairToJoin = findNextPair(buffer); + + // Make a parent node from them + MerkleTreeNode parent = new MerkleTreeNode(Arrays.asList(buffer.get(pairToJoin.getFirst()), buffer.get(pairToJoin.getSecond())), digestAlgorithm); + + // Insert it back into the "tree" at the position of the first child + buffer.set(pairToJoin.getFirst(), parent); + + // Remove the second child completely + buffer.remove(pairToJoin.getSecond().intValue()); + + // "recurse" + } + + return Iterables.getOnlyElement(buffer); + } + + protected Pair<Integer,Integer> findNextPair(List<MerkleTreeNode> nodes) { + int i = 0, j = 1; + while (i < nodes.size() && j < nodes.size()) { + MerkleTreeNode left = nodes.get(i), right = nodes.get(j); + + // At the same level + if (left.getLevel() == right.getLevel()) { + return new Pair<Integer,Integer>(i, j); + } + + // Peek to see if we have another element + if (j + 1 < nodes.size()) { + // If we do, try to match those + i++; + j++; + } else { + // Otherwise, the last two elements must be paired + return new Pair<Integer,Integer>(i, j); + } + } + + if (2 < nodes.size()) { + throw new IllegalStateException("Should not have exited loop without pairing two elements when we have at least 3 nodes"); + } else if (2 == nodes.size()) { + return new Pair<Integer,Integer>(0, 1); + } else { + throw new IllegalStateException("Must have at least two nodes to pair"); + } + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/f6af58de/test/src/main/java/org/apache/accumulo/test/replication/merkle/MerkleTreeNode.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/replication/merkle/MerkleTreeNode.java b/test/src/main/java/org/apache/accumulo/test/replication/merkle/MerkleTreeNode.java new file mode 100644 index 0000000..dfde41a --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/replication/merkle/MerkleTreeNode.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.replication.merkle; + +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.commons.codec.binary.Hex; +import org.apache.commons.lang.builder.HashCodeBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Encapsulates the level (height) within the tree, the ranges that it covers, and the new hash + */ +public class MerkleTreeNode { + private static final Logger log = LoggerFactory.getLogger(MerkleTreeNode.class); + + private Range range; + private int level; + private List<Range> children; + private byte[] hash; + + public MerkleTreeNode(Range range, int level, List<Range> children, byte[] hash) { + this.range = range; + this.level = level; + this.children = children; + this.hash = hash; + } + + public MerkleTreeNode(Key k, Value v) { + range = RangeSerialization.toRange(k); + level = 0; + children = Collections.emptyList(); + hash = v.get(); + } + + public MerkleTreeNode(List<MerkleTreeNode> children, String digestAlgorithm) throws NoSuchAlgorithmException { + level = 0; + this.children = new ArrayList<Range>(children.size()); + MessageDigest digest = MessageDigest.getInstance(digestAlgorithm); + + Range childrenRange = null; + for (MerkleTreeNode child : children) { + this.children.add(child.getRange()); + level = Math.max(child.getLevel(), level); + digest.update(child.getHash()); + + if (null == childrenRange) { + childrenRange = child.getRange(); + } else { + List<Range> overlappingRanges = Range.mergeOverlapping(Arrays.asList(childrenRange, child.getRange())); + if (1 != overlappingRanges.size()) { + log.error("Tried to merge non-contiguous ranges: {} {}", childrenRange, child.getRange()); + throw new IllegalArgumentException("Ranges must be contiguous: " + childrenRange + ", " + child.getRange()); + } + + childrenRange = overlappingRanges.get(0); + } + } + + // Our actual level is one more than the highest level of our children + level++; + + // Roll the hash up the tree + hash = digest.digest(); + + // Set the range to be the merged result of the children + range = childrenRange; + } + + public Range getRange() { + return range; + } + + public int getLevel() { + return level; + } + + public List<Range> getChildren() { + return children; + } + + public byte[] getHash() { + return hash; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(32); + sb.append("range=").append(range).append(" level=").append(level).append(" hash=").append(Hex.encodeHexString(hash)).append(" children=").append(children); + return sb.toString(); + } + + @Override + public boolean equals(Object o) { + if (o instanceof MerkleTreeNode) { + MerkleTreeNode other = (MerkleTreeNode) o; + return range.equals(other.getRange()) && level == other.getLevel() && children.equals(other.getChildren()) && Arrays.equals(hash, other.getHash()); + } + + return false; + } + + @Override + public int hashCode() { + HashCodeBuilder hcb = new HashCodeBuilder(1395, 39532); + return hcb.append(range).append(level).append(children).append(hash).toHashCode(); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/f6af58de/test/src/main/java/org/apache/accumulo/test/replication/merkle/RangeSerialization.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/replication/merkle/RangeSerialization.java b/test/src/main/java/org/apache/accumulo/test/replication/merkle/RangeSerialization.java new file mode 100644 index 0000000..62bc800 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/replication/merkle/RangeSerialization.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.replication.merkle; + +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.hadoop.io.Text; + +/** + * + */ +public class RangeSerialization { + private static final Text EMPTY = new Text(new byte[0]); + + public static Range toRange(Key key) { + Text holder = new Text(); + key.getRow(holder); + Key startKey; + if (0 == holder.getLength()) { + startKey = null; + } else { + startKey = new Key(holder); + } + + key.getColumnQualifier(holder); + Key endKey; + if (0 == holder.getLength()) { + endKey = null; + } else { + endKey = new Key(holder); + } + + // Don't be inclusive for no bounds on a Range + return new Range(startKey, startKey != null, endKey, endKey != null); + } + + public static Key toKey(Range range) { + Text row = getRow(range); + return new Key(row, EMPTY, getColumnQualifier(range)); + } + + public static Mutation toMutation(Range range, Value v) { + Text row = getRow(range); + Mutation m = new Mutation(row); + m.put(EMPTY, getColumnQualifier(range), v); + return m; + } + + public static Text getRow(Range range) { + return range.isInfiniteStartKey() ? EMPTY : range.getStartKey().getRow(); + } + + public static Text getColumnQualifier(Range range) { + return range.isInfiniteStopKey() ? EMPTY : range.getEndKey().getRow(); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/f6af58de/test/src/main/java/org/apache/accumulo/test/replication/merkle/cli/CompareTables.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/replication/merkle/cli/CompareTables.java b/test/src/main/java/org/apache/accumulo/test/replication/merkle/cli/CompareTables.java new file mode 100644 index 0000000..5ac5b68 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/replication/merkle/cli/CompareTables.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.replication.merkle.cli; + +import java.io.FileNotFoundException; +import java.security.NoSuchAlgorithmException; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.accumulo.core.cli.BatchWriterOpts; +import org.apache.accumulo.core.cli.ClientOpts; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.TableExistsException; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.data.Range; +import org.apache.commons.codec.binary.Hex; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.beust.jcommander.Parameter; + +/** + * Accepts a set of tables, computes the hashes for each, and prints the top-level hash for each table. + * <p> + * Will automatically create output tables for intermediate hashes instead of requiring their existence. + * This will raise an exception when the table we want to use already exists. + */ +public class CompareTables { + private static final Logger log = LoggerFactory.getLogger(CompareTables.class); + + public static class CompareTablesOpts extends ClientOpts { + @Parameter(names={"--tables"}, description = "Tables to compare", variableArity=true) + public List<String> tables; + + @Parameter(names = {"-nt", "--numThreads"}, required = false, description = "number of concurrent threads calculating digests") + private int numThreads = 4; + + @Parameter(names = {"-hash", "--hash"}, required = true, description = "type of hash to use") + private String hashName; + + @Parameter(names = {"-iter", "--iterator"}, required = false, description = "Should pushdown digest to iterators") + private boolean iteratorPushdown = false; + + @Parameter(names = {"-s", "--splits"}, required = false, description = "File of splits to use for merkle tree") + private String splitsFile = null; + + public List<String> getTables() { + return this.tables; + } + + public void setTables(List<String> tables) { + this.tables = tables; + } + + public int getNumThreads() { + return numThreads; + } + + public void setNumThreads(int numThreads) { + this.numThreads = numThreads; + } + + public String getHashName() { + return hashName; + } + + public void setHashName(String hashName) { + this.hashName = hashName; + } + + public boolean isIteratorPushdown() { + return iteratorPushdown; + } + + public void setIteratorPushdown(boolean iteratorPushdown) { + this.iteratorPushdown = iteratorPushdown; + } + + public String getSplitsFile() { + return splitsFile; + } + + public void setSplitsFile(String splitsFile) { + this.splitsFile = splitsFile; + } + } + + private CompareTablesOpts opts; + + protected CompareTables() {} + + public CompareTables(CompareTablesOpts opts) { + this.opts = opts; + } + + public Map<String,String> computeAllHashes() throws AccumuloException, AccumuloSecurityException, TableExistsException, NoSuchAlgorithmException, + TableNotFoundException, FileNotFoundException { + final Connector conn = opts.getConnector(); + final Map<String,String> hashesByTable = new HashMap<>(); + + for (String table : opts.getTables()) { + final String outputTableName = table + "_merkle"; + + if (conn.tableOperations().exists(outputTableName)) { + throw new IllegalArgumentException("Expected output table name to not yet exist: " + outputTableName); + } + + conn.tableOperations().create(outputTableName); + + GenerateHashes genHashes = new GenerateHashes(); + Collection<Range> ranges = genHashes.getRanges(opts.getConnector(), table, opts.getSplitsFile()); + + try { + genHashes.run(opts.getConnector(), table, table + "_merkle", opts.getHashName(), opts.getNumThreads(), opts.isIteratorPushdown(), ranges); + } catch (Exception e) { + log.error("Error generating hashes for {}", table, e); + throw new RuntimeException(e); + } + + ComputeRootHash computeRootHash = new ComputeRootHash(); + String hash = Hex.encodeHexString(computeRootHash.getHash(conn, outputTableName, opts.getHashName())); + + hashesByTable.put(table, hash); + } + + return hashesByTable; + } + + public static void main(String[] args) throws Exception { + CompareTablesOpts opts = new CompareTablesOpts(); + BatchWriterOpts bwOpts = new BatchWriterOpts(); + opts.parseArgs("CompareTables", args, bwOpts); + + if (opts.isIteratorPushdown() && null != opts.getSplitsFile()) { + throw new IllegalArgumentException("Cannot use iterator pushdown with anything other than table split points"); + } + + CompareTables compareTables = new CompareTables(opts); + Map<String,String> tableToHashes = compareTables.computeAllHashes(); + + for (Entry<String,String> entry : tableToHashes.entrySet()) { + System.out.println(entry.getKey() + " " + entry.getValue()); + } + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/f6af58de/test/src/main/java/org/apache/accumulo/test/replication/merkle/cli/ComputeRootHash.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/replication/merkle/cli/ComputeRootHash.java b/test/src/main/java/org/apache/accumulo/test/replication/merkle/cli/ComputeRootHash.java new file mode 100644 index 0000000..ea241a6 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/replication/merkle/cli/ComputeRootHash.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.replication.merkle.cli; + +import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map.Entry; + +import org.apache.accumulo.core.cli.ClientOnRequiredTable; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.test.replication.merkle.MerkleTree; +import org.apache.accumulo.test.replication.merkle.MerkleTreeNode; +import org.apache.accumulo.test.replication.merkle.RangeSerialization; +import org.apache.commons.codec.binary.Hex; + +import com.beust.jcommander.Parameter; + +/** + * Given a table created by {@link GenerateHashes} which contains the leaves of a Merkle tree, compute the root node of the Merkle tree which can be quickly + * compared to the root node of another Merkle tree to ascertain equality. + */ +public class ComputeRootHash { + + public static class ComputeRootHashOpts extends ClientOnRequiredTable { + @Parameter(names = {"-hash", "--hash"}, required = true, description = "type of hash to use") + private String hashName; + + public String getHashName() { + return hashName; + } + + public void setHashName(String hashName) { + this.hashName = hashName; + } + } + + public byte[] getHash(ComputeRootHashOpts opts) throws AccumuloException, AccumuloSecurityException, TableNotFoundException, NoSuchAlgorithmException { + Connector conn = opts.getConnector(); + String table = opts.getTableName(); + + return getHash(conn, table, opts.getHashName()); + } + + public byte[] getHash(Connector conn, String table, String hashName) throws TableNotFoundException, NoSuchAlgorithmException { + List<MerkleTreeNode> leaves = getLeaves(conn, table); + + MerkleTree tree = new MerkleTree(leaves, hashName); + + return tree.getRootNode().getHash(); + } + + protected ArrayList<MerkleTreeNode> getLeaves(Connector conn, String tableName) throws TableNotFoundException { + //TODO make this a bit more resilient to very large merkle trees by lazily reading more data from the table when necessary + final Scanner s = conn.createScanner(tableName, Authorizations.EMPTY); + final ArrayList<MerkleTreeNode> leaves = new ArrayList<MerkleTreeNode>(); + + for (Entry<Key,Value> entry : s) { + Range range = RangeSerialization.toRange(entry.getKey()); + byte[] hash = entry.getValue().get(); + + leaves.add(new MerkleTreeNode(range, 0, Collections.<Range> emptyList(), hash)); + } + + return leaves; + } + + public static void main(String[] args) throws Exception { + ComputeRootHashOpts opts = new ComputeRootHashOpts(); + opts.parseArgs("ComputeRootHash", args); + + ComputeRootHash computeRootHash = new ComputeRootHash(); + byte[] rootHash = computeRootHash.getHash(opts); + + System.out.println(Hex.encodeHexString(rootHash)); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/f6af58de/test/src/main/java/org/apache/accumulo/test/replication/merkle/cli/GenerateHashes.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/replication/merkle/cli/GenerateHashes.java b/test/src/main/java/org/apache/accumulo/test/replication/merkle/cli/GenerateHashes.java new file mode 100644 index 0000000..c2c9a5a --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/replication/merkle/cli/GenerateHashes.java @@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.replication.merkle.cli; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileNotFoundException; +import java.nio.charset.StandardCharsets; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Map.Entry; +import java.util.TreeSet; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import org.apache.accumulo.core.cli.BatchWriterOpts; +import org.apache.accumulo.core.cli.ClientOnRequiredTable; +import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.client.MutationsRejectedException; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.test.replication.merkle.RangeSerialization; +import org.apache.accumulo.test.replication.merkle.skvi.DigestIterator; +import org.apache.commons.codec.binary.Hex; +import org.apache.hadoop.io.Text; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.beust.jcommander.Parameter; +import com.google.common.collect.Iterables; + +/** + * Read from a table, compute a Merkle tree and output it to a table. Each key-value pair in the destination table is a leaf node of the Merkle tree. + */ +public class GenerateHashes { + private static final Logger log = LoggerFactory.getLogger(GenerateHashes.class); + + public static class GenerateHashesOpts extends ClientOnRequiredTable { + @Parameter(names = {"-hash", "--hash"}, required = true, description = "type of hash to use") + private String hashName; + + @Parameter(names = {"-o", "--output"}, required = true, description = "output table name, expected to exist and be writable") + private String outputTableName; + + @Parameter(names = {"-nt", "--numThreads"}, required = false, description = "number of concurrent threads calculating digests") + private int numThreads = 4; + + @Parameter(names = {"-iter", "--iterator"}, required = false, description = "Should we push down logic with an iterator") + private boolean iteratorPushdown = false; + + @Parameter(names = {"-s", "--splits"}, required = false, description = "File of splits to use for merkle tree") + private String splitsFile = null; + + public String getHashName() { + return hashName; + } + + public void setHashName(String hashName) { + this.hashName = hashName; + } + + public String getOutputTableName() { + return outputTableName; + } + + public void setOutputTableName(String outputTableName) { + this.outputTableName = outputTableName; + } + + public int getNumThreads() { + return numThreads; + } + + public void setNumThreads(int numThreads) { + this.numThreads = numThreads; + } + + public boolean isIteratorPushdown() { + return iteratorPushdown; + } + + public void setIteratorPushdown(boolean iteratorPushdown) { + this.iteratorPushdown = iteratorPushdown; + } + + public String getSplitsFile() { + return splitsFile; + } + + public void setSplitsFile(String splitsFile) { + this.splitsFile = splitsFile; + } + } + + public Collection<Range> getRanges(Connector conn, String tableName, String splitsFile) throws TableNotFoundException, AccumuloSecurityException, AccumuloException, FileNotFoundException { + if (null == splitsFile) { + log.info("Using table split points"); + Collection<Text> endRows = conn.tableOperations().listSplits(tableName); + return endRowsToRanges(endRows); + } else { + log.info("Using provided split points"); + ArrayList<Text> splits = new ArrayList<Text>(); + + String line; + java.util.Scanner file = new java.util.Scanner(new File(splitsFile), StandardCharsets.UTF_8.name()); + try { + while (file.hasNextLine()) { + line = file.nextLine(); + if (!line.isEmpty()) { + splits.add(new Text(line)); + } + } + } finally { + file.close(); + } + + Collections.sort(splits); + return endRowsToRanges(splits); + } + } + + public void run(GenerateHashesOpts opts) throws TableNotFoundException, AccumuloSecurityException, AccumuloException, NoSuchAlgorithmException, + FileNotFoundException { + Collection<Range> ranges = getRanges(opts.getConnector(), opts.getTableName(), opts.getSplitsFile()); + + run(opts.getConnector(), opts.getTableName(), opts.getOutputTableName(), opts.getHashName(), opts.getNumThreads(), opts.isIteratorPushdown(), ranges); + } + + public void run(final Connector conn, final String inputTableName, final String outputTableName, final String digestName, int numThreads, + final boolean iteratorPushdown, final Collection<Range> ranges) throws TableNotFoundException, AccumuloSecurityException, AccumuloException, + NoSuchAlgorithmException { + if (!conn.tableOperations().exists(outputTableName)) { + throw new IllegalArgumentException(outputTableName + " does not exist, please create it"); + } + + // Get some parallelism + ExecutorService svc = Executors.newFixedThreadPool(numThreads); + final BatchWriter bw = conn.createBatchWriter(outputTableName, new BatchWriterConfig()); + + try { + for (final Range range : ranges) { + final MessageDigest digest = getDigestAlgorithm(digestName); + + svc.execute(new Runnable() { + + @Override + public void run() { + Scanner s; + try { + s = conn.createScanner(inputTableName, Authorizations.EMPTY); + } catch (Exception e) { + log.error("Could not get scanner for " + inputTableName, e); + throw new RuntimeException(e); + } + + s.setRange(range); + + Value v = null; + Mutation m = null; + if (iteratorPushdown) { + IteratorSetting cfg = new IteratorSetting(50, DigestIterator.class); + cfg.addOption(DigestIterator.HASH_NAME_KEY, digestName); + s.addScanIterator(cfg); + + // The scanner should only ever return us one Key-Value, otherwise this approach won't work + Entry<Key,Value> entry = Iterables.getOnlyElement(s); + + v = entry.getValue(); + m = RangeSerialization.toMutation(range, v); + } else { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + for (Entry<Key,Value> entry : s) { + DataOutputStream out = new DataOutputStream(baos); + try { + entry.getKey().write(out); + entry.getValue().write(out); + } catch (Exception e) { + log.error("Error writing {}", entry, e); + throw new RuntimeException(e); + } + + digest.update(baos.toByteArray()); + baos.reset(); + } + + v = new Value(digest.digest()); + m = RangeSerialization.toMutation(range, v); + } + + // Log some progress + log.info("{} computed digest for {} of {}", Thread.currentThread().getName(), range, Hex.encodeHexString(v.get())); + + try { + bw.addMutation(m); + } catch (MutationsRejectedException e) { + log.error("Could not write mutation", e); + throw new RuntimeException(e); + } + } + }); + } + + svc.shutdown(); + + // Wait indefinitely for the scans to complete + while (!svc.isTerminated()) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + log.error("Interrupted while waiting for executor service to gracefully complete. Exiting now"); + svc.shutdownNow(); + return; + } + } + } finally { + // We can only safely close this when we're exiting or we've completely all tasks + bw.close(); + } + } + + public TreeSet<Range> endRowsToRanges(Collection<Text> endRows) { + ArrayList<Text> sortedEndRows = new ArrayList<Text>(endRows); + Collections.sort(sortedEndRows); + + Text prevEndRow = null; + TreeSet<Range> ranges = new TreeSet<>(); + for (Text endRow : sortedEndRows) { + if (null == prevEndRow) { + ranges.add(new Range(null, false, endRow, true)); + } else { + ranges.add(new Range(prevEndRow, false, endRow, true)); + } + prevEndRow = endRow; + } + + ranges.add(new Range(prevEndRow, false, null, false)); + + return ranges; + } + + protected MessageDigest getDigestAlgorithm(String digestName) throws NoSuchAlgorithmException { + return MessageDigest.getInstance(digestName); + } + + public static void main(String[] args) throws Exception { + GenerateHashesOpts opts = new GenerateHashesOpts(); + BatchWriterOpts bwOpts = new BatchWriterOpts(); + opts.parseArgs(GenerateHashes.class.getName(), args, bwOpts); + + if (opts.isIteratorPushdown() && null != opts.getSplitsFile()) { + throw new IllegalArgumentException("Cannot use iterator pushdown with anything other than table split points"); + } + + GenerateHashes generate = new GenerateHashes(); + generate.run(opts); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/f6af58de/test/src/main/java/org/apache/accumulo/test/replication/merkle/cli/ManualComparison.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/replication/merkle/cli/ManualComparison.java b/test/src/main/java/org/apache/accumulo/test/replication/merkle/cli/ManualComparison.java new file mode 100644 index 0000000..9f52233 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/replication/merkle/cli/ManualComparison.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.replication.merkle.cli; + +import java.util.Iterator; +import java.util.Map.Entry; + +import org.apache.accumulo.core.cli.ClientOpts; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.security.Authorizations; + +import com.beust.jcommander.Parameter; + +/** + * Accepts two table names and enumerates all key-values pairs in both checking for correctness. All differences between the two tables will be printed to the + * console. + */ +public class ManualComparison { + + public static class ManualComparisonOpts extends ClientOpts { + @Parameter(names={"--table1"}, required = true, description = "First table") + public String table1; + + @Parameter(names={"--table2"}, required = true, description = "First table") + public String table2; + } + + /** + * @param args + */ + public static void main(String[] args) throws Exception { + ManualComparisonOpts opts = new ManualComparisonOpts(); + opts.parseArgs("ManualComparison", args); + + Connector conn = opts.getConnector(); + + Scanner s1 = conn.createScanner(opts.table1, Authorizations.EMPTY), s2 = conn.createScanner(opts.table2, Authorizations.EMPTY); + Iterator<Entry<Key,Value>> iter1 = s1.iterator(), iter2 = s2.iterator(); + boolean incrementFirst = true, incrementSecond = true; + + Entry<Key,Value> entry1 = iter1.next(), entry2 = iter2.next(); + while (iter1.hasNext() && iter2.hasNext()) { + if (incrementFirst) { + entry1 = iter1.next(); + } + if (incrementSecond) { + entry2 = iter2.next(); + } + incrementFirst = false; + incrementSecond = false; + + if (!entry1.equals(entry2)) { + + if (entry1.getKey().compareTo(entry2.getKey()) < 0) { + System.out.println("Exist in original " + entry1); + incrementFirst = true; + } else if (entry2.getKey().compareTo(entry1.getKey()) < 0) { + System.out.println("Exist in replica " + entry2); + incrementSecond = true; + } else { + System.out.println("Differ... " + entry1 + " " + entry2); + incrementFirst = true; + incrementSecond = true; + } + } else { + incrementFirst = true; + incrementSecond = true; + } + } + + System.out.println("\nExtra entries from " + opts.table1); + while (iter1.hasNext()) { + System.out.println(iter1.next()); + } + + System.out.println("\nExtra entries from " + opts.table2); + while (iter2.hasNext()) { + System.out.println(iter2.next()); + } + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/f6af58de/test/src/main/java/org/apache/accumulo/test/replication/merkle/ingest/RandomWorkload.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/replication/merkle/ingest/RandomWorkload.java b/test/src/main/java/org/apache/accumulo/test/replication/merkle/ingest/RandomWorkload.java new file mode 100644 index 0000000..5558350 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/replication/merkle/ingest/RandomWorkload.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.replication.merkle.ingest; + +import java.util.Random; + +import org.apache.accumulo.core.cli.BatchWriterOpts; +import org.apache.accumulo.core.cli.ClientOnDefaultTable; +import org.apache.accumulo.core.client.BatchWriter; +import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Connector; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Value; +import org.apache.hadoop.io.Text; + +import com.beust.jcommander.Parameter; + +/** + * Generates some random data with a given percent of updates to be deletes. + */ +public class RandomWorkload { + public static final String DEFAULT_TABLE_NAME = "randomWorkload"; + + public static class RandomWorkloadOpts extends ClientOnDefaultTable { + @Parameter(names = {"-n", "--num"}, required = true, description = "Num records to write") + public long numRecords; + + @Parameter(names = {"-r", "--rows"}, required = true, description = "Range of rows that can be generated") + public int rowMax; + + @Parameter(names = {"-cf", "--colfams"}, required = true, description = "Range of column families that can be generated") + public int cfMax; + + @Parameter(names = {"-cq", "--colquals"}, required = true, description = "Range of column qualifiers that can be generated") + public int cqMax; + + @Parameter(names = {"-d", "--deletes"}, required = false, description = "Percentage of updates that should be deletes") + public int deletePercent = 5; + + public RandomWorkloadOpts() { + super(DEFAULT_TABLE_NAME); + } + + public RandomWorkloadOpts(String tableName) { + super(tableName); + } + } + + public void run(RandomWorkloadOpts opts, BatchWriterConfig cfg) throws Exception { + run(opts.getConnector(), opts.getTableName(), cfg, opts.numRecords, opts.rowMax, opts.cfMax, opts.cqMax, opts.deletePercent); + } + + public void run(final Connector conn, final String tableName, final BatchWriterConfig cfg, final long numRecords, int rowMax, int cfMax, int cqMax, + int deletePercent) throws Exception { + + final Random rowRand = new Random(12345); + final Random cfRand = new Random(12346); + final Random cqRand = new Random(12347); + final Random deleteRand = new Random(12348); + long valueCounter = 0l; + + if (!conn.tableOperations().exists(tableName)) { + conn.tableOperations().create(tableName); + } + + BatchWriter bw = conn.createBatchWriter(tableName, cfg); + try { + final Text row = new Text(), cf = new Text(), cq = new Text(); + final Value value = new Value(); + for (long i = 0; i < numRecords; i++) { + row.set(Integer.toString(rowRand.nextInt(rowMax))); + cf.set(Integer.toString(cfRand.nextInt(cfMax))); + cq.set(Integer.toString(cqRand.nextInt(cqMax))); + + Mutation m = new Mutation(row); + + // Choose a random value between [0,100) + int deleteValue = deleteRand.nextInt(100); + + // putDelete if the value we chose is less than our delete percentage + if (deleteValue < deletePercent) { + m.putDelete(cf, cq); + } else { + value.set(Long.toString(valueCounter).getBytes()); + m.put(cf, cq, valueCounter, value); + } + + bw.addMutation(m); + + valueCounter++; + } + } finally { + bw.close(); + } + } + + public static void main(String[] args) throws Exception { + RandomWorkloadOpts opts = new RandomWorkloadOpts(); + BatchWriterOpts bwOpts = new BatchWriterOpts(); + opts.parseArgs(RandomWorkload.class.getSimpleName(), args, bwOpts); + + RandomWorkload rw = new RandomWorkload(); + + rw.run(opts, bwOpts.getBatchWriterConfig()); + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/f6af58de/test/src/main/java/org/apache/accumulo/test/replication/merkle/package-info.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/replication/merkle/package-info.java b/test/src/main/java/org/apache/accumulo/test/replication/merkle/package-info.java new file mode 100644 index 0000000..6afcdf5 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/replication/merkle/package-info.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * A <a href="http://en.wikipedia.org/wiki/Merkle_tree">Merkle tree</a> is a hash tree and can be used to evaluate equality over large + * files with the ability to ascertain what portions of the files differ. Each leaf of the Merkle tree is some hash of a + * portion of the file, with each leaf corresponding to some "range" within the source file. As such, if all leaves are + * considered as ranges of the source file, the "sum" of all leaves creates a contiguous range over the entire file. + * <P> + * The parent of any nodes (typically, a binary tree; however this is not required) is the concatenation of the hashes of + * the children. We can construct a full tree by walking up the tree, creating parents from children, until we have a root + * node. To check equality of two files that each have a merkle tree built, we can very easily compare the value of at the + * root of the Merkle tree to know whether or not the files are the same. + * <P> + * Additionally, in the situation where we have two files with we expect to be the same but are not, we can walk back down + * the tree, finding subtrees that are equal and subtrees that are not. Subtrees that are equal correspond to portions of + * the files which are identical, where subtrees that are not equal correspond to discrepancies between the two files. + * <P> + * We can apply this concept to Accumulo, treating a table as a file, and ranges within a file as an Accumulo Range. We can + * then compute the hashes over each of these Ranges and compute the entire Merkle tree to determine if two tables are + * equivalent. + * + * @since 1.7.0 + */ +package org.apache.accumulo.test.replication.merkle; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/accumulo/blob/f6af58de/test/src/main/java/org/apache/accumulo/test/replication/merkle/skvi/DigestIterator.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/replication/merkle/skvi/DigestIterator.java b/test/src/main/java/org/apache/accumulo/test/replication/merkle/skvi/DigestIterator.java new file mode 100644 index 0000000..dcda76a --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/replication/merkle/skvi/DigestIterator.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.test.replication.merkle.skvi; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.util.Collection; +import java.util.Map; + +import org.apache.accumulo.core.data.ByteSequence; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.iterators.IteratorEnvironment; +import org.apache.accumulo.core.iterators.SortedKeyValueIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * {@link SortedKeyValueIterator} which attempts to compute a hash over some range of Key-Value pairs. + * <P> + * For the purposes of constructing a Merkle tree, this class will only generate a meaningful result if the (Batch)Scanner will compute a single digest over a + * Range. If the (Batch)Scanner stops and restarts in the middle of a session, incorrect values will be returned and the merkle tree will be invalid. + */ +public class DigestIterator implements SortedKeyValueIterator<Key,Value> { + private static final Logger log = LoggerFactory.getLogger(DigestIterator.class); + public static final String HASH_NAME_KEY = "hash.name"; + + private MessageDigest digest; + private Key topKey; + private Value topValue; + private SortedKeyValueIterator<Key,Value> source; + + @Override + public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException { + String hashName = options.get(HASH_NAME_KEY); + if (null == hashName) { + throw new IOException(HASH_NAME_KEY + " must be provided as option"); + } + + try { + this.digest = MessageDigest.getInstance(hashName); + } catch (NoSuchAlgorithmException e) { + throw new IOException(e); + } + + this.topKey = null; + this.topValue = null; + this.source = source; + } + + @Override + public boolean hasTop() { + return null != topKey; + } + + @Override + public void next() throws IOException { + // We can't call next() if we already consumed it all + if (!this.source.hasTop()) { + this.topKey = null; + this.topValue = null; + return; + } + + this.source.next(); + + consume(); + } + + @Override + public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException { + this.source.seek(range, columnFamilies, inclusive); + + consume(); + } + + protected void consume() throws IOException { + digest.reset(); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(baos); + + if (!this.source.hasTop()) { + this.topKey = null; + this.topValue = null; + + return; + } + + Key lastKeySeen = null; + while (this.source.hasTop()) { + baos.reset(); + + Key currentKey = this.source.getTopKey(); + lastKeySeen = currentKey; + + currentKey.write(dos); + this.source.getTopValue().write(dos); + + digest.update(baos.toByteArray()); + + this.source.next(); + } + + this.topKey = lastKeySeen; + this.topValue = new Value(digest.digest()); + } + + @Override + public Key getTopKey() { + return topKey; + } + + @Override + public Value getTopValue() { + return topValue; + } + + @Override + public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) { + DigestIterator copy = new DigestIterator(); + try { + copy.digest = MessageDigest.getInstance(digest.getAlgorithm()); + } catch (NoSuchAlgorithmException e) { + throw new RuntimeException(e); + } + + copy.topKey = this.topKey; + copy.topValue = this.topValue; + copy.source = this.source.deepCopy(env); + + return copy; + } + +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/f6af58de/test/system/merkle-replication/configure-replication.sh ---------------------------------------------------------------------- diff --git a/test/system/merkle-replication/configure-replication.sh b/test/system/merkle-replication/configure-replication.sh new file mode 100755 index 0000000..44ebdd7 --- /dev/null +++ b/test/system/merkle-replication/configure-replication.sh @@ -0,0 +1,99 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Start: Resolve Script Directory +SOURCE="${BASH_SOURCE[0]}" +while [[ -h "${SOURCE}" ]]; do # resolve $SOURCE until the file is no longer a symlink + dir=$( cd -P "$( dirname "${SOURCE}" )" && pwd ) + SOURCE=$(readlink "${SOURCE}") + [[ "${SOURCE}" != /* ]] && SOURCE="${dir}/${SOURCE}" # if $SOURCE was a relative symlink, we need to resolve it relative to the path where the symlink file was located +done +dir=$( cd -P "$( dirname "${SOURCE}" )" && pwd ) +script=$( basename "${SOURCE}" ) +# Stop: Resolve Script Directory + +# Guess at ACCUMULO_HOME and ACCUMULO_CONF_DIR if not already defined +ACCUMULO_HOME=${ACCUMULO_HOME:-"${dir}/../../.."} +ACCUMULO_CONF_DIR=${ACCUMULO_CONF_DIR:-"$ACCUMULO_HOME/conf"} + +# Get the configuration values +. ./merkle-env.sh + +tmpdir=$(mktemp -dt "$0.XXXXXXXXXX") + +source_commands="${tmpdir}/source_commands.txt" + +echo 'Removing old tables and setting replication name on source' + +echo "deletetable -f $SOURCE_TABLE_NAME" >> $source_commands +echo "createtable $SOURCE_TABLE_NAME" >> $source_commands +echo "config -s replication.name=source" >> $source_commands +echo "quit" >> $source_commands + +# Source: drop and create tables, configure unique name for replication and grant perms +echo $SOURCE_ACCUMULO_PASSWORD | ${ACCUMULO_HOME}/bin/accumulo shell -u $SOURCE_ACCUMULO_USER -z \ + $SOURCE_INSTANCE $SOURCE_ZOOKEEPERS -f $source_commands + +destination_commands="${tmpdir}/destination_commands.txt" + +echo 'Removing old tables and setting replication name on destination' + +echo "deletetable -f $DESTINATION_TABLE_NAME" >> $destination_commands +echo "createtable $DESTINATION_TABLE_NAME" >> $destination_commands +echo "config -s replication.name=destination" >> $destination_commands +echo "quit" >> $destination_commands + +# Destination: drop and create tables, configure unique name for replication and grant perms +echo $DESTINATION_ACCUMULO_PASSWORD | ${ACCUMULO_HOME}/bin/accumulo shell -u $DESTINATION_ACCUMULO_USER -z \ + $DESTINATION_INSTANCE $DESTINATION_ZOOKEEPERS -f $destination_commands + +rm $source_commands +rm $destination_commands + +table_id=$(echo $DESTINATION_ACCUMULO_PASSWORD | ${ACCUMULO_HOME}/bin/accumulo shell -u $DESTINATION_ACCUMULO_USER -z \ + $DESTINATION_INSTANCE $DESTINATION_ZOOKEEPERS -e 'tables -l' | grep "${DESTINATION_TABLE_NAME}" \ + | grep -v "${DESTINATION_MERKLE_TABLE_NAME}" | awk '{print $3}') + +echo "Configuring $SOURCE_TABLE_NAME to replicate to $DESTINATION_TABLE_NAME (id=$table_id)" + +# Define our peer 'destination' with the ReplicaSystem impl, instance name and ZKs +echo "config -s replication.peer.destination=org.apache.accumulo.tserver.replication.AccumuloReplicaSystem,$DESTINATION_INSTANCE,$DESTINATION_ZOOKEEPERS" >> $source_commands +# Username for 'destination' +echo "config -s replication.peer.user.destination=$DESTINATION_ACCUMULO_USER" >> $source_commands +# Password for 'destination' +echo "config -s replication.peer.password.destination=$DESTINATION_ACCUMULO_PASSWORD" >> $source_commands +# Configure replication to 'destination' for $SOURCE_TABLE_NAME +echo "config -t $SOURCE_TABLE_NAME -s table.replication.target.destination=$table_id" >> $source_commands +# Enable replication for the table +echo "config -t $SOURCE_TABLE_NAME -s table.replication=true" >> $source_commands +echo "quit" >> $source_commands + +# Configure replication from source to destination and then enable it +echo $SOURCE_ACCUMULO_PASSWORD | ${ACCUMULO_HOME}/bin/accumulo shell -u $SOURCE_ACCUMULO_USER -z \ + $SOURCE_INSTANCE $SOURCE_ZOOKEEPERS -f $source_commands + +rm $source_commands + +# Add some splits to make ingest faster +echo 'Adding splits...' + +echo $SOURCE_ACCUMULO_PASSWORD | ${ACCUMULO_HOME}/bin/accumulo shell -u $SOURCE_ACCUMULO_USER -z \ + $SOURCE_INSTANCE $SOURCE_ZOOKEEPERS -e "addsplits -t $SOURCE_TABLE_NAME 1 2 3 4 5 6 7 8 9" + +echo $DESTINATION_ACCUMULO_PASSWORD | ${ACCUMULO_HOME}/bin/accumulo shell -u $DESTINATION_ACCUMULO_USER -z \ + $DESTINATION_INSTANCE $DESTINATION_ZOOKEEPERS -e "addsplits -t $DESTINATION_TABLE_NAME 1 2 3 4 5 6 7 8 9" + http://git-wip-us.apache.org/repos/asf/accumulo/blob/f6af58de/test/system/merkle-replication/ingest-data.sh ---------------------------------------------------------------------- diff --git a/test/system/merkle-replication/ingest-data.sh b/test/system/merkle-replication/ingest-data.sh new file mode 100755 index 0000000..91b8ccc --- /dev/null +++ b/test/system/merkle-replication/ingest-data.sh @@ -0,0 +1,39 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Start: Resolve Script Directory +SOURCE="${BASH_SOURCE[0]}" +while [[ -h "${SOURCE}" ]]; do # resolve $SOURCE until the file is no longer a symlink + dir=$( cd -P "$( dirname "${SOURCE}" )" && pwd ) + SOURCE=$(readlink "${SOURCE}") + [[ "${SOURCE}" != /* ]] && SOURCE="${dir}/${SOURCE}" # if $SOURCE was a relative symlink, we need to resolve it relative to the path where the symlink file was located +done +dir=$( cd -P "$( dirname "${SOURCE}" )" && pwd ) +script=$( basename "${SOURCE}" ) +# Stop: Resolve Script Directory + +# Guess at ACCUMULO_HOME and ACCUMULO_CONF_DIR if not already defined +ACCUMULO_HOME=${ACCUMULO_HOME:-"${dir}/../../.."} +ACCUMULO_CONF_DIR=${ACCUMULO_CONF_DIR:-"$ACCUMULO_HOME/conf"} + +# Get the configuration values +. ./merkle-env.sh + +# Ingest data into the source table +$ACCUMULO_HOME/bin/accumulo org.apache.accumulo.test.replication.merkle.ingest.RandomWorkload --table $SOURCE_TABLE_NAME \ + -i $SOURCE_INSTANCE -z $SOURCE_ZOOKEEPERS -u $SOURCE_ACCUMULO_USER -p $SOURCE_ACCUMULO_PASSWORD -d $DELETE_PERCENT \ + -cf $MAX_CF -cq $MAX_CQ -r $MAX_ROW -n $NUM_RECORDS http://git-wip-us.apache.org/repos/asf/accumulo/blob/f6af58de/test/system/merkle-replication/merkle-env.sh ---------------------------------------------------------------------- diff --git a/test/system/merkle-replication/merkle-env.sh b/test/system/merkle-replication/merkle-env.sh new file mode 100755 index 0000000..d405394 --- /dev/null +++ b/test/system/merkle-replication/merkle-env.sh @@ -0,0 +1,48 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Random data will be written to this table +SOURCE_TABLE_NAME='replicationSource' +# Then replicated to this table +DESTINATION_TABLE_NAME='replicationDestination' + +# Merkle tree to be stored in this table for the source table +SOURCE_MERKLE_TABLE_NAME="${SOURCE_TABLE_NAME}_merkle" +# Merkle tree to be stored in this table for the destination table +DESTINATION_MERKLE_TABLE_NAME="${DESTINATION_TABLE_NAME}_merkle" + +# Connection information to Accumulo +SOURCE_ACCUMULO_USER="user" +SOURCE_ACCUMULO_PASSWORD="password" + +DESTINATION_ACCUMULO_USER="${SOURCE_ACCUMULO_USER}" +DESTINATION_ACCUMULO_PASSWORD="${SOURCE_ACCUMULO_PASSWORD}" + +SOURCE_INSTANCE="accumulo" +DESTINATION_INSTANCE="${SOURCE_INSTANCE}" + +SOURCE_ZOOKEEPERS="localhost" +DESTINATION_ZOOKEEPERS="${SOURCE_ZOOKEEPERS}" + +# Accumulo user to be configured on the destination instance +#REPLICATION_USER="${ACCUMULO_USER}" +#REPLICATION_PASSWORD="${ACCUMULO_PASSWORD}" + +# Control amount and distribution of data written +NUM_RECORDS=100000000 +MAX_ROW=1000000 +MAX_CF=10 +MAX_CQ=100 +DELETE_PERCENT=0 http://git-wip-us.apache.org/repos/asf/accumulo/blob/f6af58de/test/system/merkle-replication/verify-data.sh ---------------------------------------------------------------------- diff --git a/test/system/merkle-replication/verify-data.sh b/test/system/merkle-replication/verify-data.sh new file mode 100755 index 0000000..225d892 --- /dev/null +++ b/test/system/merkle-replication/verify-data.sh @@ -0,0 +1,91 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Start: Resolve Script Directory +SOURCE="${BASH_SOURCE[0]}" +while [[ -h "${SOURCE}" ]]; do # resolve $SOURCE until the file is no longer a symlink + dir=$( cd -P "$( dirname "${SOURCE}" )" && pwd ) + SOURCE=$(readlink "${SOURCE}") + [[ "${SOURCE}" != /* ]] && SOURCE="${dir}/${SOURCE}" # if $SOURCE was a relative symlink, we need to resolve it relative to the path where the symlink file was located +done +dir=$( cd -P "$( dirname "${SOURCE}" )" && pwd ) +script=$( basename "${SOURCE}" ) +# Stop: Resolve Script Directory + +# Guess at ACCUMULO_HOME and ACCUMULO_CONF_DIR if not already defined +ACCUMULO_HOME=${ACCUMULO_HOME:-"${dir}/../../.."} +ACCUMULO_CONF_DIR=${ACCUMULO_CONF_DIR:-"$ACCUMULO_HOME/conf"} + +# Get the configuration values +. ./merkle-env.sh + +tmpdir=$(mktemp -dt "$0.XXXXXXXXXX") + +splits=${tmpdir}/splits + +echo 1 >> $splits +echo 2 >> $splits +echo 3 >> $splits +echo 4 >> $splits +echo 5 >> $splits +echo 6 >> $splits +echo 7 >> $splits +echo 8 >> $splits +echo 9 >> $splits + +commands=${tmpdir}/commands + +# Generate leaves of merkle trees for source +echo "deletetable -f $SOURCE_MERKLE_TABLE_NAME" >> $commands +echo "createtable $SOURCE_MERKLE_TABLE_NAME" >> $commands +echo "quit" >> $commands + +echo $SOURCE_ACCUMULO_PASSWORD | ${ACCUMULO_HOME}/bin/accumulo shell -u $SOURCE_ACCUMULO_USER -z \ + $SOURCE_INSTANCE $SOURCE_ZOOKEEPERS -f $commands + +echo -e "\nGenerating merkle tree hashes for $SOURCE_TABLE_NAME" + +$ACCUMULO_HOME/bin/accumulo org.apache.accumulo.test.replication.merkle.cli.GenerateHashes -t $SOURCE_TABLE_NAME \ + -o $SOURCE_MERKLE_TABLE_NAME -i $SOURCE_INSTANCE -z $SOURCE_ZOOKEEPERS -u $SOURCE_ACCUMULO_USER \ + -p $SOURCE_ACCUMULO_PASSWORD -nt 8 -hash MD5 --splits $splits + +rm $commands + +# Generate leaves of merkle trees for destination +echo "deletetable -f $DESTINATION_MERKLE_TABLE_NAME" >> $commands +echo "createtable $DESTINATION_MERKLE_TABLE_NAME" >> $commands +echo "quit" >> $commands + +echo $DESTINATION_ACCUMULO_PASSWORD | ${ACCUMULO_HOME}/bin/accumulo shell -u $DESTINATION_ACCUMULO_USER -z \ + $DESTINATION_INSTANCE $DESTINATION_ZOOKEEPERS -f $commands + +echo -e "\nGenerating merkle tree hashes for $DESTINATION_TABLE_NAME" + +$ACCUMULO_HOME/bin/accumulo org.apache.accumulo.test.replication.merkle.cli.GenerateHashes -t $DESTINATION_TABLE_NAME \ + -o $DESTINATION_MERKLE_TABLE_NAME -i $DESTINATION_INSTANCE -z $DESTINATION_ZOOKEEPERS -u $DESTINATION_ACCUMULO_USER \ + -p $DESTINATION_ACCUMULO_PASSWORD -nt 8 -hash MD5 --splits $splits + +echo -e "\nComputing root hash:" + +#Compute root node of merkle tree +$ACCUMULO_HOME/bin/accumulo org.apache.accumulo.test.replication.merkle.cli.ComputeRootHash -t $SOURCE_MERKLE_TABLE_NAME \ + -i $SOURCE_INSTANCE -z $SOURCE_ZOOKEEPERS -u $SOURCE_ACCUMULO_USER -p $SOURCE_ACCUMULO_PASSWORD -hash MD5 + +$ACCUMULO_HOME/bin/accumulo org.apache.accumulo.test.replication.merkle.cli.ComputeRootHash -t $DESTINATION_MERKLE_TABLE_NAME \ + -i $DESTINATION_INSTANCE -z $DESTINATION_ZOOKEEPERS -u $DESTINATION_ACCUMULO_USER -p $DESTINATION_ACCUMULO_PASSWORD -hash MD5 + +rm -rf $tmpdir