Author: kturner Date: Wed Apr 10 17:37:58 2013 New Revision: 1466582 URL: http://svn.apache.org/r1466582 Log: ACCUMULO-1044 fixed some issues w/ metadata constraint bulk flag check, made the check more strict, and added a lot test for it
Modified: accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java accumulo/branches/1.5/server/src/test/java/org/apache/accumulo/server/constraints/MetadataConstraintsTest.java Modified: accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java URL: http://svn.apache.org/viewvc/accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java?rev=1466582&r1=1466581&r2=1466582&view=diff ============================================================================== --- accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java (original) +++ accumulo/branches/1.5/server/src/main/java/org/apache/accumulo/server/constraints/MetadataConstraints.java Wed Apr 10 17:37:58 2013 @@ -31,6 +31,7 @@ import org.apache.accumulo.core.data.Val import org.apache.accumulo.core.util.ColumnFQ; import org.apache.accumulo.core.util.MetadataTable.DataFileValue; import org.apache.accumulo.core.zookeeper.ZooUtil; +import org.apache.accumulo.fate.zookeeper.TransactionWatcher.Arbitrator; import org.apache.accumulo.server.client.HdfsZooInstance; import org.apache.accumulo.server.zookeeper.TransactionWatcher.ZooArbitrator; import org.apache.accumulo.server.zookeeper.ZooCache; @@ -139,6 +140,8 @@ public class MetadataConstraints impleme violations = addViolation(violations, 5); } + boolean checkedBulk = false; + for (ColumnUpdate columnUpdate : colUpdates) { Text columnFamily = new Text(columnUpdate.getColumnFamily()); @@ -168,7 +171,7 @@ public class MetadataConstraints impleme } else if (columnFamily.equals(Constants.METADATA_SCANFILE_COLUMN_FAMILY)) { } else if (columnFamily.equals(Constants.METADATA_BULKFILE_COLUMN_FAMILY)) { - if (!columnUpdate.isDeleted()) { + if (!columnUpdate.isDeleted() && !checkedBulk) { // splits, which also write the time reference, are allowed to write this reference even when // the transaction is not running because the other half of the tablet is holding a reference // to the file. @@ -177,26 +180,42 @@ public class MetadataConstraints impleme // but it writes everything. We allow it to re-write the bulk information if it is setting the location. // See ACCUMULO-1230. boolean isLocationMutation = false; + + HashSet<Text> dataFiles = new HashSet<Text>(); + HashSet<Text> loadedFiles = new HashSet<Text>(); + + String tidString = new String(columnUpdate.getValue()); + int otherTidCount = 0; + for (ColumnUpdate update : mutation.getUpdates()) { if (new ColumnFQ(update).equals(Constants.METADATA_DIRECTORY_COLUMN)) { isSplitMutation = true; - } - if (update.getColumnFamily().equals(Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY)) { + } else if (new Text(update.getColumnFamily()).equals(Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY)) { isLocationMutation = true; + } else if (new Text(update.getColumnFamily()).equals(Constants.METADATA_DATAFILE_COLUMN_FAMILY)) { + dataFiles.add(new Text(update.getColumnQualifier())); + } else if (new Text(update.getColumnFamily()).equals(Constants.METADATA_BULKFILE_COLUMN_FAMILY)) { + loadedFiles.add(new Text(update.getColumnQualifier())); + + if (!new String(update.getValue()).equals(tidString)) { + otherTidCount++; + } } } if (!isSplitMutation && !isLocationMutation) { - String tidString = new String(columnUpdate.getValue()); long tid = Long.parseLong(tidString); + try { - if (!new ZooArbitrator().transactionAlive(Constants.BULK_ARBITRATOR_TYPE, tid)) { + if (otherTidCount > 0 || !dataFiles.equals(loadedFiles) || !getArbitrator().transactionAlive(Constants.BULK_ARBITRATOR_TYPE, tid)) { violations = addViolation(violations, 8); } } catch (Exception ex) { violations = addViolation(violations, 8); } } + + checkedBulk = true; } } else { if (!isValidColumn(columnUpdate)) { @@ -248,6 +267,10 @@ public class MetadataConstraints impleme return violations; } + protected Arbitrator getArbitrator() { + return new ZooArbitrator(); + } + public String getViolationDescription(short violationCode) { switch (violationCode) { case 1: Modified: accumulo/branches/1.5/server/src/test/java/org/apache/accumulo/server/constraints/MetadataConstraintsTest.java URL: http://svn.apache.org/viewvc/accumulo/branches/1.5/server/src/test/java/org/apache/accumulo/server/constraints/MetadataConstraintsTest.java?rev=1466582&r1=1466581&r2=1466582&view=diff ============================================================================== --- accumulo/branches/1.5/server/src/test/java/org/apache/accumulo/server/constraints/MetadataConstraintsTest.java (original) +++ accumulo/branches/1.5/server/src/test/java/org/apache/accumulo/server/constraints/MetadataConstraintsTest.java Wed Apr 10 17:37:58 2013 @@ -18,6 +18,7 @@ package org.apache.accumulo.server.const import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import java.util.List; @@ -25,6 +26,7 @@ import org.apache.accumulo.core.Constant import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.data.Mutation; import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.fate.zookeeper.TransactionWatcher.Arbitrator; import org.apache.hadoop.io.Text; import org.apache.log4j.Level; import org.apache.log4j.Logger; @@ -32,6 +34,26 @@ import org.junit.Test; public class MetadataConstraintsTest { + static class TestMetadataConstraints extends MetadataConstraints { + @Override + protected Arbitrator getArbitrator() { + return new Arbitrator() { + + @Override + public boolean transactionAlive(String type, long tid) throws Exception { + if (tid == 9) + throw new RuntimeException("txid 9 reserved for future use"); + return tid == 5 || tid == 7; + } + + @Override + public boolean transactionComplete(String type, long tid) throws Exception { + return tid != 5 && tid != 7; + } + }; + } + } + @Test public void testCheck() { Logger.getLogger(AccumuloConfiguration.class).setLevel(Level.ERROR); @@ -106,13 +128,112 @@ public class MetadataConstraintsTest { assertEquals(1, violations.size()); assertEquals(Short.valueOf((short) 4), violations.get(0)); + } + + @Test + public void testBulkFileCheck() { + MetadataConstraints mc = new TestMetadataConstraints(); + Mutation m; + List<Short> violations; + + // inactive txid m = new Mutation(new Text("0;foo")); m.put(Constants.METADATA_BULKFILE_COLUMN_FAMILY, new Text("/someFile"), new Value("12345".getBytes())); + m.put(Constants.METADATA_DATAFILE_COLUMN_FAMILY, new Text("/someFile"), new Value("1,1".getBytes())); violations = mc.check(null, m); - assertNotNull(violations); assertEquals(1, violations.size()); assertEquals(Short.valueOf((short)8), violations.get(0)); + + // txid that throws exception + m = new Mutation(new Text("0;foo")); + m.put(Constants.METADATA_BULKFILE_COLUMN_FAMILY, new Text("/someFile"), new Value("9".getBytes())); + m.put(Constants.METADATA_DATAFILE_COLUMN_FAMILY, new Text("/someFile"), new Value("1,1".getBytes())); + violations = mc.check(null, m); + assertNotNull(violations); + assertEquals(1, violations.size()); + assertEquals(Short.valueOf((short) 8), violations.get(0)); + + // active txid w/ file + m = new Mutation(new Text("0;foo")); + m.put(Constants.METADATA_BULKFILE_COLUMN_FAMILY, new Text("/someFile"), new Value("5".getBytes())); + m.put(Constants.METADATA_DATAFILE_COLUMN_FAMILY, new Text("/someFile"), new Value("1,1".getBytes())); + violations = mc.check(null, m); + assertNull(violations); + + // active txid w/o file + m = new Mutation(new Text("0;foo")); + m.put(Constants.METADATA_BULKFILE_COLUMN_FAMILY, new Text("/someFile"), new Value("5".getBytes())); + violations = mc.check(null, m); + assertNotNull(violations); + assertEquals(1, violations.size()); + assertEquals(Short.valueOf((short) 8), violations.get(0)); + + // two active txids w/ files + m = new Mutation(new Text("0;foo")); + m.put(Constants.METADATA_BULKFILE_COLUMN_FAMILY, new Text("/someFile"), new Value("5".getBytes())); + m.put(Constants.METADATA_DATAFILE_COLUMN_FAMILY, new Text("/someFile"), new Value("1,1".getBytes())); + m.put(Constants.METADATA_BULKFILE_COLUMN_FAMILY, new Text("/someFile2"), new Value("7".getBytes())); + m.put(Constants.METADATA_DATAFILE_COLUMN_FAMILY, new Text("/someFile2"), new Value("1,1".getBytes())); + violations = mc.check(null, m); + assertNotNull(violations); + assertEquals(1, violations.size()); + assertEquals(Short.valueOf((short) 8), violations.get(0)); + + // two files w/ one active txid + m = new Mutation(new Text("0;foo")); + m.put(Constants.METADATA_BULKFILE_COLUMN_FAMILY, new Text("/someFile"), new Value("5".getBytes())); + m.put(Constants.METADATA_DATAFILE_COLUMN_FAMILY, new Text("/someFile"), new Value("1,1".getBytes())); + m.put(Constants.METADATA_BULKFILE_COLUMN_FAMILY, new Text("/someFile2"), new Value("5".getBytes())); + m.put(Constants.METADATA_DATAFILE_COLUMN_FAMILY, new Text("/someFile2"), new Value("1,1".getBytes())); + violations = mc.check(null, m); + assertNull(violations); + + // two loaded w/ one active txid and one file + m = new Mutation(new Text("0;foo")); + m.put(Constants.METADATA_BULKFILE_COLUMN_FAMILY, new Text("/someFile"), new Value("5".getBytes())); + m.put(Constants.METADATA_DATAFILE_COLUMN_FAMILY, new Text("/someFile"), new Value("1,1".getBytes())); + m.put(Constants.METADATA_BULKFILE_COLUMN_FAMILY, new Text("/someFile2"), new Value("5".getBytes())); + violations = mc.check(null, m); + assertNotNull(violations); + assertEquals(1, violations.size()); + assertEquals(Short.valueOf((short) 8), violations.get(0)); + + // active txid, mutation that looks like split + m = new Mutation(new Text("0;foo")); + m.put(Constants.METADATA_BULKFILE_COLUMN_FAMILY, new Text("/someFile"), new Value("5".getBytes())); + Constants.METADATA_DIRECTORY_COLUMN.put(m, new Value("/t1".getBytes())); + violations = mc.check(null, m); + assertNull(violations); + + // inactive txid, mutation that looks like split + m = new Mutation(new Text("0;foo")); + m.put(Constants.METADATA_BULKFILE_COLUMN_FAMILY, new Text("/someFile"), new Value("12345".getBytes())); + Constants.METADATA_DIRECTORY_COLUMN.put(m, new Value("/t1".getBytes())); + violations = mc.check(null, m); + assertNull(violations); + + // active txid, mutation that looks like a load + m = new Mutation(new Text("0;foo")); + m.put(Constants.METADATA_BULKFILE_COLUMN_FAMILY, new Text("/someFile"), new Value("5".getBytes())); + m.put(Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY, new Text("789"), new Value("127.0.0.1:9997".getBytes())); + violations = mc.check(null, m); + assertNull(violations); + + // inactive txid, mutation that looks like a load + m = new Mutation(new Text("0;foo")); + m.put(Constants.METADATA_BULKFILE_COLUMN_FAMILY, new Text("/someFile"), new Value("12345".getBytes())); + m.put(Constants.METADATA_CURRENT_LOCATION_COLUMN_FAMILY, new Text("789"), new Value("127.0.0.1:9997".getBytes())); + violations = mc.check(null, m); + assertNull(violations); + + // deleting a load flag + m = new Mutation(new Text("0;foo")); + m.putDelete(Constants.METADATA_BULKFILE_COLUMN_FAMILY, new Text("/someFile")); + violations = mc.check(null, m); + assertNull(violations); + + } }