This is an automated email from the ASF dual-hosted git repository. cshannon pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 4641c55e933ba0efa647562ea2c63caab63defcd Merge: 60f46e1abd a2ce072e5c Author: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com> AuthorDate: Fri Oct 27 18:44:02 2023 -0400 Merge branch '2.1' .../java/org/apache/accumulo/manager/Manager.java | 6 ++ .../accumulo/manager/TabletGroupWatcher.java | 27 +++++++- .../apache/accumulo/manager/state/MergeStats.java | 23 +++++++ .../accumulo/manager/state/MergeStatsTest.java | 71 ++++++++++++++++++++++ .../apache/accumulo/test/manager/MergeStateIT.java | 19 ++++++ 5 files changed, 145 insertions(+), 1 deletion(-) diff --cc server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index 009a407dd0,196b50d76c..8142af7c3f --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@@ -648,8 -645,25 +648,14 @@@ public class Manager extends AbstractSe case COMPLETE: break; case STARTED: - case SPLITTING: - return TabletGoalState.HOSTED; - case WAITING_FOR_CHOPPED: - if (tls.getState(tserverSet.getCurrentServers()).equals(TabletState.HOSTED)) { - if (tls.chopped) { - return TabletGoalState.UNASSIGNED; - } - } else if (tls.chopped && tls.walogs.isEmpty()) { - return TabletGoalState.UNASSIGNED; - } - return TabletGoalState.HOSTED; case WAITING_FOR_OFFLINE: + // If we have walogs we need to be HOSTED to recover + if (!tls.walogs.isEmpty()) { + return TabletGoalState.HOSTED; + } else { + return TabletGoalState.UNASSIGNED; + } case MERGING: return TabletGoalState.UNASSIGNED; } diff --cc server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java index bbee9aea48,4ffeee967d..b99bf3473b --- a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java @@@ -836,65 -801,15 +847,74 @@@ abstract class TabletGroupWatcher exten Key key = entry.getKey(); Value value = entry.getValue(); + // Verify that Tablet is offline + if (isTabletAssigned(key)) { + throw new IllegalStateException( + "Tablet " + key.getRow() + " is assigned during a merge!"); + // Verify that Tablet has no WALs + } else if (key.getColumnFamily().equals(LogColumnFamily.NAME)) { + throw new IllegalStateException("Tablet " + key.getRow() + " has walogs during a merge!"); - } else if (key.getColumnFamily().equals(DataFileColumnFamily.NAME)) { - m.put(key.getColumnFamily(), key.getColumnQualifier(), value); ++ } ++ + final KeyExtent keyExtent = KeyExtent.fromMetaRow(key.getRow()); + + // Keep track of the last Key Extent seen so we can use it to fence + // of RFiles when merging the metadata + if (lastExtent != null && !keyExtent.equals(lastExtent)) { + previousKeyExtent = lastExtent; + } + + // Special case to handle the highest/stop tablet, which is where files are + // merged to. The existing merge code won't delete files from this tablet + // so we need to handle the deletes in this tablet when fencing files. + // We may be able to make this simpler in the future. + if (keyExtent.equals(stopExtent)) { + if (previousKeyExtent != null + && key.getColumnFamily().equals(DataFileColumnFamily.NAME)) { + + // Fence off existing files by the end row of the previous tablet and current tablet + final StoredTabletFile existing = StoredTabletFile.of(key.getColumnQualifier()); + // The end row should be inclusive for the current tablet and the previous end row + // should be exclusive for the start row + Range fenced = new Range(previousKeyExtent.endRow(), false, keyExtent.endRow(), true); + + // Clip range if exists + fenced = existing.hasRange() ? existing.getRange().clip(fenced) : fenced; + + final StoredTabletFile newFile = StoredTabletFile.of(existing.getPath(), fenced); + // If the existing metadata does not match then we need to delete the old + // and replace with a new range + if (!existing.equals(newFile)) { + m.putDelete(DataFileColumnFamily.NAME, existing.getMetadataText()); + m.put(key.getColumnFamily(), newFile.getMetadataText(), value); + } + + fileCount++; + } + // For the highest tablet we only care about the DataFileColumnFamily + continue; + } + + // Handle metadata updates for all other tablets except the highest tablet + // Ranges are created for the files and then added to the highest tablet in + // the merge range. Deletes are handled later for the old files when the tablets + // are removed. + if (key.getColumnFamily().equals(DataFileColumnFamily.NAME)) { + final StoredTabletFile existing = StoredTabletFile.of(key.getColumnQualifier()); + + // Fence off files by the previous tablet and current tablet that is being merged + // The end row should be inclusive for the current tablet and the previous end row should + // be exclusive for the start row. + Range fenced = new Range(previousKeyExtent != null ? previousKeyExtent.endRow() : null, + false, keyExtent.endRow(), true); + + // Clip range with the tablet range if the range already exists + fenced = existing.hasRange() ? existing.getRange().clip(fenced) : fenced; + + // Move the file and range to the last tablet + StoredTabletFile newFile = StoredTabletFile.of(existing.getPath(), fenced); + m.put(key.getColumnFamily(), newFile.getMetadataText(), value); + fileCount++; } else if (TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(key) && firstPrevRowValue == null) { diff --cc server/manager/src/main/java/org/apache/accumulo/manager/state/MergeStats.java index 27409a1a9f,5cecbfe98b..0f1bcbee0c --- a/server/manager/src/main/java/org/apache/accumulo/manager/state/MergeStats.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/state/MergeStats.java @@@ -198,12 -271,24 +210,23 @@@ public class MergeStats break; } } - log.debug("chopped {} v.chopped {} unassigned {} v.unassigned {} verify.total {}", chopped, - verify.chopped, unassigned, verify.unassigned, verify.total); + log.debug("unassigned {} v.unassigned {} verify.total {}", unassigned, verify.unassigned, + verify.total); - return chopped == verify.chopped && unassigned == verify.unassigned - && unassigned == verify.total; + return unassigned == verify.unassigned && unassigned == verify.total; } + @VisibleForTesting + void verifyState(MergeInfo info, MergeState expectedState) { + Preconditions.checkState(info.getState() == expectedState, "Unexpected merge state %s", + info.getState()); + } + + @VisibleForTesting + boolean verifyWalogs(TabletLocationState tls) { + return tls.walogs.isEmpty(); + } + public static void main(String[] args) throws Exception { ServerUtilOpts opts = new ServerUtilOpts(); opts.parseArgs(MergeStats.class.getName(), args); diff --cc server/manager/src/test/java/org/apache/accumulo/manager/state/MergeStatsTest.java index 0000000000,c9ad9763c0..3e44f42981 mode 000000,100644..100644 --- a/server/manager/src/test/java/org/apache/accumulo/manager/state/MergeStatsTest.java +++ b/server/manager/src/test/java/org/apache/accumulo/manager/state/MergeStatsTest.java @@@ -1,0 -1,71 +1,71 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + package org.apache.accumulo.manager.state; + + import static org.junit.jupiter.api.Assertions.assertFalse; + import static org.junit.jupiter.api.Assertions.assertThrows; + import static org.junit.jupiter.api.Assertions.assertTrue; + + import java.util.Collection; + import java.util.List; + + import org.apache.accumulo.core.data.TableId; + import org.apache.accumulo.core.dataImpl.KeyExtent; + import org.apache.accumulo.core.metadata.TabletLocationState; + import org.apache.accumulo.core.metadata.TabletLocationState.BadLocationStateException; + import org.apache.accumulo.server.manager.state.MergeInfo; + import org.apache.accumulo.server.manager.state.MergeInfo.Operation; + import org.apache.accumulo.server.manager.state.MergeState; + import org.apache.hadoop.io.Text; + import org.junit.jupiter.api.Test; + + public class MergeStatsTest { + + @Test + public void testVerifyState() { + KeyExtent keyExtent = new KeyExtent(TableId.of("table"), new Text("end"), new Text("begin")); + MergeInfo mergeInfo = new MergeInfo(keyExtent, Operation.MERGE); + MergeStats stats = new MergeStats(mergeInfo); + mergeInfo.setState(MergeState.WAITING_FOR_OFFLINE); + + // Verify WAITING_FOR_OFFLINE does not throw an exception + stats.verifyState(mergeInfo, MergeState.WAITING_FOR_OFFLINE); + + // State is wrong so should throw exception - mergeInfo.setState(MergeState.WAITING_FOR_CHOPPED); ++ mergeInfo.setState(MergeState.MERGING); + assertThrows(IllegalStateException.class, + () -> stats.verifyState(mergeInfo, MergeState.WAITING_FOR_OFFLINE)); + } + + @Test + public void testVerifyWalogs() throws BadLocationStateException { + KeyExtent keyExtent = new KeyExtent(TableId.of("table"), new Text("end"), new Text("begin")); + MergeStats stats = new MergeStats(new MergeInfo(keyExtent, Operation.MERGE)); + + // Verify that if there are Walogs the return true, else false + assertTrue(stats.verifyWalogs(getState(keyExtent, List.of()))); + assertFalse(stats.verifyWalogs(getState(keyExtent, List.of(List.of("log1"))))); + } + + private TabletLocationState getState(KeyExtent keyExtent, Collection<Collection<String>> walogs) + throws BadLocationStateException { - return new TabletLocationState(keyExtent, null, null, null, null, walogs, true); ++ return new TabletLocationState(keyExtent, null, null, null, null, walogs); + } + + } diff --cc test/src/main/java/org/apache/accumulo/test/manager/MergeStateIT.java index 003b387f1b,21173c3c5e..1a6252b75e --- a/test/src/main/java/org/apache/accumulo/test/manager/MergeStateIT.java +++ b/test/src/main/java/org/apache/accumulo/test/manager/MergeStateIT.java @@@ -45,6 -46,8 +45,7 @@@ import org.apache.accumulo.core.metadat import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location; import org.apache.accumulo.core.security.Authorizations; import org.apache.accumulo.core.security.TablePermission; + import org.apache.accumulo.core.tabletserver.log.LogEntry; -import org.apache.accumulo.core.util.HostAndPort; import org.apache.accumulo.manager.state.MergeStats; import org.apache.accumulo.server.ServerContext; import org.apache.accumulo.server.manager.state.Assignment; @@@ -195,8 -206,26 +196,26 @@@ public class MergeStateIT extends Confi m = TabletColumnFamily.createPrevRowMutation(tablet); Collection<Collection<String>> walogs = Collections.emptyList(); metaDataStateStore.unassign(Collections.singletonList(new TabletLocationState(tablet, null, - Location.current(state.someTServer), null, null, walogs, false)), null); + Location.current(state.someTServer), null, null, walogs)), null); + // Add a walog which should keep the state from transitioning to MERGING + KeyExtent ke = new KeyExtent(tableId, new Text("t"), new Text("p")); + m = new Mutation(ke.toMetaRow()); + LogEntry logEntry = new LogEntry(ke, 100, "f1"); + m.at().family(logEntry.getColumnFamily()).qualifier(logEntry.getColumnQualifier()) + .timestamp(logEntry.timestamp).put(logEntry.getValue()); + update(accumuloClient, m); + + // Verify state is still WAITING_FOR_OFFLINE + stats = scan(state, metaDataStateStore); + newState = stats.nextMergeState(accumuloClient, state); + assertEquals(MergeState.WAITING_FOR_OFFLINE, newState); + + // Delete the walog which will now allow a transition to MERGING + m = new Mutation(ke.toMetaRow()); + m.putDelete(logEntry.getColumnFamily(), logEntry.getColumnQualifier(), logEntry.timestamp); + update(accumuloClient, m); + // now we can split stats = scan(state, metaDataStateStore); assertEquals(MergeState.MERGING, stats.nextMergeState(accumuloClient, state));