Author: kturner Date: Wed Feb 29 20:37:49 2012 New Revision: 1295255 URL: http://svn.apache.org/viewvc?rev=1295255&view=rev Log: ACCUMULO-436 * made merge resiliant to process death in middle of metadata updates * made delete row removed chopped flag from last tablet * added sanity check in getHighTablet()
Modified: incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/Master.java incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/state/MergeStats.java incubator/accumulo/branches/1.4/src/server/src/test/java/org/apache/accumulo/server/master/TestMergeState.java Modified: incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/Master.java URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/Master.java?rev=1295255&r1=1295254&r2=1295255&view=diff ============================================================================== --- incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/Master.java (original) +++ incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/Master.java Wed Feb 29 20:37:49 2012 @@ -41,11 +41,11 @@ import java.util.concurrent.atomic.Atomi import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; -import org.apache.accumulo.core.client.BatchDeleter; import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.client.IsolatedScanner; +import org.apache.accumulo.core.client.MutationsRejectedException; import org.apache.accumulo.core.client.RowIterator; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.TableNotFoundException; @@ -1506,57 +1506,36 @@ public class Master implements LiveTServ private void updateMergeState(Map<Text,MergeStats> mergeStatsCache) { for (MergeStats stats : mergeStatsCache.values()) { try { - MergeState update = stats.nextMergeState(); - if (update == MergeState.MERGING) { - if (mergeStarted(stats.getMergeInfo()) || stats.verifyMergeConsistency(getConnector(), Master.this)) { - try { - if (stats.getMergeInfo().isDelete()) { - deleteTablets(stats.getMergeInfo()); - } else { - mergeMetadataRecords(stats.getMergeInfo()); - } - setMergeState(stats.getMergeInfo(), update = MergeState.COMPLETE); - } catch (Exception ex) { - log.error("Unable merge metadata table records", ex); - } - } - } + MergeState update = stats.nextMergeState(getConnector(), Master.this); + + // when next state is MERGING, its important to persist this before + // starting the merge... the verification check that is done before + // moving into the merging state could fail if merge starts but does + // not finish if (update == MergeState.COMPLETE) update = MergeState.NONE; if (update != stats.getMergeInfo().getState()) { setMergeState(stats.getMergeInfo(), update); } + + if (update == MergeState.MERGING) { + try { + if (stats.getMergeInfo().isDelete()) { + deleteTablets(stats.getMergeInfo()); + } else { + mergeMetadataRecords(stats.getMergeInfo()); + } + setMergeState(stats.getMergeInfo(), update = MergeState.COMPLETE); + } catch (Exception ex) { + log.error("Unable merge metadata table records", ex); + } + } } catch (Exception ex) { log.error("Unable to update merge state for merge " + stats.getMergeInfo().getRange(), ex); } } } - /** - * Determine if a merge has been started, and was interrupted and needs to be completed. - * - * @param mergeInfo - * @return - * @throws AccumuloException - */ - private boolean mergeStarted(MergeInfo mergeInfo) throws AccumuloException { - KeyExtent merge = mergeInfo.getRange(); - KeyExtent highTablet = getHighTablet(merge); - // merge specifies -inf - if (merge.getPrevEndRow() == null) { - // lasttablet must start at -inf - return highTablet.getPrevEndRow() == null; - } - // upper end of merge is set -inf - if (highTablet.getPrevEndRow() == null) { - // nothing can come before this: merge is started - return true; - } - // if prevRow of last tablet is <= merge start, merge has started - Text mergeStart = merge.getPrevEndRow(); - return highTablet.getPrevEndRow().compareTo(mergeStart.getBytes(), 0, mergeStart.getLength()) <= 0; - } - private void deleteTablets(MergeInfo info) throws AccumuloException { KeyExtent range = info.getRange(); log.debug("Deleting tablets for " + range); @@ -1604,18 +1583,21 @@ public class Master implements LiveTServ } } MetadataTable.addDeleteEntries(range, datafiles, SecurityConstants.getSystemCredentials()); - log.debug("Removing metadata table entries in range " + deleteRange); - BatchDeleter bd = conn.createBatchDeleter(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS, 4, 100000l, 1000l, 4); - bd.setRanges(Collections.singleton(deleteRange)); - bd.delete(); - bd.close(); + BatchWriter bw = conn.createBatchWriter(Constants.METADATA_TABLE_NAME, 1000000l, 100l, 1); + try { + deleteTablets(deleteRange, bw, conn); + } finally { + bw.close(); + } + if (followingTablet != null) { log.debug("Updating prevRow of " + followingTablet + " to " + range.getPrevEndRow()); - BatchWriter bw = conn.createBatchWriter(Constants.METADATA_TABLE_NAME, 1000l, 100l, 1); + bw = conn.createBatchWriter(Constants.METADATA_TABLE_NAME, 1000l, 100l, 1); try { Mutation m = new Mutation(followingTablet.getMetadataEntry()); ColumnFQ.put(m, Constants.METADATA_PREV_ROW_COLUMN, KeyExtent.encodePrevEndRow(range.getPrevEndRow())); + ColumnFQ.putDelete(m, Constants.METADATA_CHOPPED_COLUMN); bw.addMutation(m); bw.flush(); } finally { @@ -1643,7 +1625,7 @@ public class Master implements LiveTServ if (start == null) { start = new Text(); } - Range scanRange = new Range(KeyExtent.getMetadataEntry(range.getTableId(), start), false, stopRow, true); + Range scanRange = new Range(KeyExtent.getMetadataEntry(range.getTableId(), start), false, stopRow, false); BatchWriter bw = null; try { long fileCount = 0; @@ -1654,14 +1636,13 @@ public class Master implements LiveTServ scanner.setRange(scanRange); ColumnFQ.fetch(scanner, Constants.METADATA_PREV_ROW_COLUMN); ColumnFQ.fetch(scanner, Constants.METADATA_TIME_COLUMN); + ColumnFQ.fetch(scanner, Constants.METADATA_DIRECTORY_COLUMN); scanner.fetchColumnFamily(Constants.METADATA_DATAFILE_COLUMN_FAMILY); Mutation m = new Mutation(stopRow); String maxLogicalTime = null; for (Entry<Key,Value> entry : scanner) { Key key = entry.getKey(); Value value = entry.getValue(); - if (key.getRow().equals(stopRow)) - break; if (key.getColumnFamily().equals(Constants.METADATA_DATAFILE_COLUMN_FAMILY)) { m.put(key.getColumnFamily(), key.getColumnQualifier(), value); fileCount++; @@ -1670,6 +1651,8 @@ public class Master implements LiveTServ firstPrevRowValue = new Value(value); } else if (Constants.METADATA_TIME_COLUMN.hasColumns(key)) { maxLogicalTime = TabletTime.maxMetadataTime(maxLogicalTime, value.toString()); + } else if (Constants.METADATA_DIRECTORY_COLUMN.hasColumns(key)) { + bw.addMutation(MetadataTable.createDeleteMutation(range.getTableId().toString(), entry.getValue().toString())); } } @@ -1688,8 +1671,10 @@ public class Master implements LiveTServ if (!m.getUpdates().isEmpty()) { bw.addMutation(m); - bw.flush(); } + + bw.flush(); + log.debug("Moved " + fileCount + " files to " + stop); if (firstPrevRowValue == null) { @@ -1702,38 +1687,13 @@ public class Master implements LiveTServ log.debug("Setting the prevRow for last tablet: " + stop); bw.addMutation(updatePrevRow); bw.flush(); - - // Delete everything in the other tablets - scanner = conn.createScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS); - log.debug("Scanning range " + scanRange); - scanner.setRange(scanRange); - for (Entry<Key,Value> entry : scanner) { - Key key = entry.getKey(); - if (key.getRow().equals(stopRow)) - break; - if (Constants.METADATA_DIRECTORY_COLUMN.hasColumns(key)) { - bw.addMutation(MetadataTable.createDeleteMutation(range.getTableId().toString(), entry.getValue().toString())); - } - - // TODO could group by row - m = new Mutation(key.getRow()); - m.putDelete(key.getColumnFamily(), key.getColumnQualifier()); - log.debug("deleting entry " + key); - bw.addMutation(m); - } - bw.flush(); + + deleteTablets(scanRange, bw, conn); // Clean-up the last chopped marker - scanner = conn.createScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS); - scanner.fetchColumnFamily(Constants.METADATA_CHOPPED_COLUMN_FAMILY); - scanner.setRange(new Range(stopRow, stopRow)); - for (Entry<Key,Value> entry : scanner) { - Key key = entry.getKey(); - m = new Mutation(key.getRow()); - m.putDelete(key.getColumnFamily(), key.getColumnQualifier()); - log.debug("deleting entry " + key); - bw.addMutation(m); - } + m = new Mutation(stopRow); + ColumnFQ.putDelete(m, Constants.METADATA_CHOPPED_COLUMN); + bw.addMutation(m); bw.flush(); } catch (Exception ex) { @@ -1746,6 +1706,36 @@ public class Master implements LiveTServ } } } + + private void deleteTablets(Range scanRange, BatchWriter bw, Connector conn) throws TableNotFoundException, MutationsRejectedException { + Scanner scanner; + Mutation m; + // Delete everything in the other tablets + // group all deletes into tablet into one mutation, this makes tablets + // either dissapear entirely or not all.. this is important for the case + // where the process terminates in the loop below... + scanner = conn.createScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS); + log.debug("Deleting range " + scanRange); + scanner.setRange(scanRange); + RowIterator rowIter = new RowIterator(scanner); + while (rowIter.hasNext()) { + Iterator<Entry<Key,Value>> row = rowIter.next(); + m = null; + while (row.hasNext()) { + Entry<Key,Value> entry = row.next(); + Key key = entry.getKey(); + + if (m == null) + m = new Mutation(key.getRow()); + + m.putDelete(key.getColumnFamily(), key.getColumnQualifier()); + log.debug("deleting entry " + key); + } + bw.addMutation(m); + } + + bw.flush(); + } private KeyExtent getHighTablet(KeyExtent range) throws AccumuloException { try { @@ -1759,7 +1749,11 @@ public class Master implements LiveTServ throw new AccumuloException("No last tablet for a merge " + range); } Entry<Key,Value> entry = iterator.next(); - return new KeyExtent(entry.getKey().getRow(), KeyExtent.decodePrevEndRow(entry.getValue())); + KeyExtent highTablet = new KeyExtent(entry.getKey().getRow(), KeyExtent.decodePrevEndRow(entry.getValue())); + if (highTablet.getTableId() != range.getTableId()) { + throw new AccumuloException("No last tablet for merge " + range + " " + highTablet); + } + return highTablet; } catch (Exception ex) { throw new AccumuloException("Unexpected failure finding the last tablet for a merge " + range, ex); } Modified: incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/state/MergeStats.java URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/state/MergeStats.java?rev=1295255&r1=1295254&r2=1295255&view=diff ============================================================================== --- incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/state/MergeStats.java (original) +++ incubator/accumulo/branches/1.4/src/server/src/main/java/org/apache/accumulo/server/master/state/MergeStats.java Wed Feb 29 20:37:49 2012 @@ -93,7 +93,7 @@ public class MergeStats { this.unassigned++; } - public MergeState nextMergeState() throws Exception { + public MergeState nextMergeState(Connector connector, CurrentState master) throws Exception { MergeState state = info.getState(); if (state == MergeState.NONE) return state; @@ -141,7 +141,10 @@ public class MergeStats { } else { log.info(chopped + " tablets are chopped, " + unassigned + " are offline " + info.getRange()); if (unassigned == total && chopped == needsToBeChopped) { - state = MergeState.MERGING; + if (verifyMergeConsistency(connector, master)) + state = MergeState.MERGING; + else + log.info("Merge consistency check failed " + info.getRange()); } else { log.info("Waiting for " + unassigned + " unassigned tablets to be " + total + " " + info.getRange()); } @@ -163,7 +166,7 @@ public class MergeStats { return state; } - public boolean verifyMergeConsistency(Connector connector, CurrentState master) throws TableNotFoundException, IOException { + private boolean verifyMergeConsistency(Connector connector, CurrentState master) throws TableNotFoundException, IOException { MergeStats verify = new MergeStats(info); Scanner scanner = connector.createScanner(Constants.METADATA_TABLE_NAME, Constants.NO_AUTHS); MetaDataTableScanner.configureScanner(scanner, master); Modified: incubator/accumulo/branches/1.4/src/server/src/test/java/org/apache/accumulo/server/master/TestMergeState.java URL: http://svn.apache.org/viewvc/incubator/accumulo/branches/1.4/src/server/src/test/java/org/apache/accumulo/server/master/TestMergeState.java?rev=1295255&r1=1295254&r2=1295255&view=diff ============================================================================== --- incubator/accumulo/branches/1.4/src/server/src/test/java/org/apache/accumulo/server/master/TestMergeState.java (original) +++ incubator/accumulo/branches/1.4/src/server/src/test/java/org/apache/accumulo/server/master/TestMergeState.java Wed Feb 29 20:37:49 2012 @@ -132,7 +132,7 @@ public class TestMergeState { // do the state check MergeStats stats = scan(state, metaDataStateStore); - MergeState newState = stats.nextMergeState(); + MergeState newState = stats.nextMergeState(connector, state); Assert.assertEquals(MergeState.WAITING_FOR_OFFLINE, newState); // unassign the tablets @@ -141,12 +141,9 @@ public class TestMergeState { deleter.setRanges(Collections.singletonList(new Range())); deleter.delete(); - // now we should be ready to merge + // now we should be ready to merge but, we have an inconsistent !METADATA table stats = scan(state, metaDataStateStore); - Assert.assertEquals(MergeState.MERGING, stats.nextMergeState()); - - // but, we have an inconsistent !METADATA table, so double check - Assert.assertFalse(stats.verifyMergeConsistency(connector, state)); + Assert.assertEquals(MergeState.WAITING_FOR_OFFLINE, stats.nextMergeState(connector, state)); // finish the split KeyExtent tablet = new KeyExtent(tableId, new Text("p"), new Text("o")); @@ -157,7 +154,7 @@ public class TestMergeState { // onos... there's a new tablet online stats = scan(state, metaDataStateStore); - Assert.assertEquals(MergeState.WAITING_FOR_CHOPPED, stats.nextMergeState()); + Assert.assertEquals(MergeState.WAITING_FOR_CHOPPED, stats.nextMergeState(connector, state)); // chop it m = tablet.getPrevRowUpdateMutation(); @@ -165,7 +162,7 @@ public class TestMergeState { update(connector, m); stats = scan(state, metaDataStateStore); - Assert.assertEquals(MergeState.WAITING_FOR_OFFLINE, stats.nextMergeState()); + Assert.assertEquals(MergeState.WAITING_FOR_OFFLINE, stats.nextMergeState(connector, state)); // take it offline m = tablet.getPrevRowUpdateMutation(); @@ -174,10 +171,7 @@ public class TestMergeState { // now we can split stats = scan(state, metaDataStateStore); - Assert.assertEquals(MergeState.MERGING, stats.nextMergeState()); - - // and we have consistent !METADATA table - Assert.assertTrue(stats.verifyMergeConsistency(connector, state)); + Assert.assertEquals(MergeState.MERGING, stats.nextMergeState(connector, state)); }