This is an automated email from the ASF dual-hosted git repository.

cshannon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit 4641c55e933ba0efa647562ea2c63caab63defcd
Merge: 60f46e1abd a2ce072e5c
Author: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com>
AuthorDate: Fri Oct 27 18:44:02 2023 -0400

    Merge branch '2.1'

 .../java/org/apache/accumulo/manager/Manager.java  |  6 ++
 .../accumulo/manager/TabletGroupWatcher.java       | 27 +++++++-
 .../apache/accumulo/manager/state/MergeStats.java  | 23 +++++++
 .../accumulo/manager/state/MergeStatsTest.java     | 71 ++++++++++++++++++++++
 .../apache/accumulo/test/manager/MergeStateIT.java | 19 ++++++
 5 files changed, 145 insertions(+), 1 deletion(-)

diff --cc server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index 009a407dd0,196b50d76c..8142af7c3f
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@@ -648,8 -645,25 +648,14 @@@ public class Manager extends AbstractSe
              case COMPLETE:
                break;
              case STARTED:
 -            case SPLITTING:
 -              return TabletGoalState.HOSTED;
 -            case WAITING_FOR_CHOPPED:
 -              if 
(tls.getState(tserverSet.getCurrentServers()).equals(TabletState.HOSTED)) {
 -                if (tls.chopped) {
 -                  return TabletGoalState.UNASSIGNED;
 -                }
 -              } else if (tls.chopped && tls.walogs.isEmpty()) {
 -                return TabletGoalState.UNASSIGNED;
 -              }
 -
                return TabletGoalState.HOSTED;
              case WAITING_FOR_OFFLINE:
+               // If we have walogs we need to be HOSTED to recover
+               if (!tls.walogs.isEmpty()) {
+                 return TabletGoalState.HOSTED;
+               } else {
+                 return TabletGoalState.UNASSIGNED;
+               }
              case MERGING:
                return TabletGoalState.UNASSIGNED;
            }
diff --cc 
server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
index bbee9aea48,4ffeee967d..b99bf3473b
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
@@@ -836,65 -801,15 +847,74 @@@ abstract class TabletGroupWatcher exten
          Key key = entry.getKey();
          Value value = entry.getValue();
  
+         // Verify that Tablet is offline
+         if (isTabletAssigned(key)) {
+           throw new IllegalStateException(
+               "Tablet " + key.getRow() + " is assigned during a merge!");
+           // Verify that Tablet has no WALs
+         } else if (key.getColumnFamily().equals(LogColumnFamily.NAME)) {
+           throw new IllegalStateException("Tablet " + key.getRow() + " has 
walogs during a merge!");
 -        } else if (key.getColumnFamily().equals(DataFileColumnFamily.NAME)) {
 -          m.put(key.getColumnFamily(), key.getColumnQualifier(), value);
++        }
++
 +        final KeyExtent keyExtent = KeyExtent.fromMetaRow(key.getRow());
 +
 +        // Keep track of the last Key Extent seen so we can use it to fence
 +        // of RFiles when merging the metadata
 +        if (lastExtent != null && !keyExtent.equals(lastExtent)) {
 +          previousKeyExtent = lastExtent;
 +        }
 +
 +        // Special case to handle the highest/stop tablet, which is where 
files are
 +        // merged to. The existing merge code won't delete files from this 
tablet
 +        // so we need to handle the deletes in this tablet when fencing files.
 +        // We may be able to make this simpler in the future.
 +        if (keyExtent.equals(stopExtent)) {
 +          if (previousKeyExtent != null
 +              && key.getColumnFamily().equals(DataFileColumnFamily.NAME)) {
 +
 +            // Fence off existing files by the end row of the previous tablet 
and current tablet
 +            final StoredTabletFile existing = 
StoredTabletFile.of(key.getColumnQualifier());
 +            // The end row should be inclusive for the current tablet and the 
previous end row
 +            // should be exclusive for the start row
 +            Range fenced = new Range(previousKeyExtent.endRow(), false, 
keyExtent.endRow(), true);
 +
 +            // Clip range if exists
 +            fenced = existing.hasRange() ? existing.getRange().clip(fenced) : 
fenced;
 +
 +            final StoredTabletFile newFile = 
StoredTabletFile.of(existing.getPath(), fenced);
 +            // If the existing metadata does not match then we need to delete 
the old
 +            // and replace with a new range
 +            if (!existing.equals(newFile)) {
 +              m.putDelete(DataFileColumnFamily.NAME, 
existing.getMetadataText());
 +              m.put(key.getColumnFamily(), newFile.getMetadataText(), value);
 +            }
 +
 +            fileCount++;
 +          }
 +          // For the highest tablet we only care about the 
DataFileColumnFamily
 +          continue;
 +        }
 +
 +        // Handle metadata updates for all other tablets except the highest 
tablet
 +        // Ranges are created for the files and then added to the highest 
tablet in
 +        // the merge range. Deletes are handled later for the old files when 
the tablets
 +        // are removed.
 +        if (key.getColumnFamily().equals(DataFileColumnFamily.NAME)) {
 +          final StoredTabletFile existing = 
StoredTabletFile.of(key.getColumnQualifier());
 +
 +          // Fence off files by the previous tablet and current tablet that 
is being merged
 +          // The end row should be inclusive for the current tablet and the 
previous end row should
 +          // be exclusive for the start row.
 +          Range fenced = new Range(previousKeyExtent != null ? 
previousKeyExtent.endRow() : null,
 +              false, keyExtent.endRow(), true);
 +
 +          // Clip range with the tablet range if the range already exists
 +          fenced = existing.hasRange() ? existing.getRange().clip(fenced) : 
fenced;
 +
 +          // Move the file and range to the last tablet
 +          StoredTabletFile newFile = StoredTabletFile.of(existing.getPath(), 
fenced);
 +          m.put(key.getColumnFamily(), newFile.getMetadataText(), value);
 +
            fileCount++;
          } else if (TabletColumnFamily.PREV_ROW_COLUMN.hasColumns(key)
              && firstPrevRowValue == null) {
diff --cc 
server/manager/src/main/java/org/apache/accumulo/manager/state/MergeStats.java
index 27409a1a9f,5cecbfe98b..0f1bcbee0c
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/state/MergeStats.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/state/MergeStats.java
@@@ -198,12 -271,24 +210,23 @@@ public class MergeStats 
          break;
        }
      }
 -    log.debug("chopped {} v.chopped {} unassigned {} v.unassigned {} 
verify.total {}", chopped,
 -        verify.chopped, unassigned, verify.unassigned, verify.total);
 +    log.debug("unassigned {} v.unassigned {} verify.total {}", unassigned, 
verify.unassigned,
 +        verify.total);
  
 -    return chopped == verify.chopped && unassigned == verify.unassigned
 -        && unassigned == verify.total;
 +    return unassigned == verify.unassigned && unassigned == verify.total;
    }
  
+   @VisibleForTesting
+   void verifyState(MergeInfo info, MergeState expectedState) {
+     Preconditions.checkState(info.getState() == expectedState, "Unexpected 
merge state %s",
+         info.getState());
+   }
+ 
+   @VisibleForTesting
+   boolean verifyWalogs(TabletLocationState tls) {
+     return tls.walogs.isEmpty();
+   }
+ 
    public static void main(String[] args) throws Exception {
      ServerUtilOpts opts = new ServerUtilOpts();
      opts.parseArgs(MergeStats.class.getName(), args);
diff --cc 
server/manager/src/test/java/org/apache/accumulo/manager/state/MergeStatsTest.java
index 0000000000,c9ad9763c0..3e44f42981
mode 000000,100644..100644
--- 
a/server/manager/src/test/java/org/apache/accumulo/manager/state/MergeStatsTest.java
+++ 
b/server/manager/src/test/java/org/apache/accumulo/manager/state/MergeStatsTest.java
@@@ -1,0 -1,71 +1,71 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *   https://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing,
+  * software distributed under the License is distributed on an
+  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  * KIND, either express or implied.  See the License for the
+  * specific language governing permissions and limitations
+  * under the License.
+  */
+ package org.apache.accumulo.manager.state;
+ 
+ import static org.junit.jupiter.api.Assertions.assertFalse;
+ import static org.junit.jupiter.api.Assertions.assertThrows;
+ import static org.junit.jupiter.api.Assertions.assertTrue;
+ 
+ import java.util.Collection;
+ import java.util.List;
+ 
+ import org.apache.accumulo.core.data.TableId;
+ import org.apache.accumulo.core.dataImpl.KeyExtent;
+ import org.apache.accumulo.core.metadata.TabletLocationState;
+ import 
org.apache.accumulo.core.metadata.TabletLocationState.BadLocationStateException;
+ import org.apache.accumulo.server.manager.state.MergeInfo;
+ import org.apache.accumulo.server.manager.state.MergeInfo.Operation;
+ import org.apache.accumulo.server.manager.state.MergeState;
+ import org.apache.hadoop.io.Text;
+ import org.junit.jupiter.api.Test;
+ 
+ public class MergeStatsTest {
+ 
+   @Test
+   public void testVerifyState() {
+     KeyExtent keyExtent = new KeyExtent(TableId.of("table"), new Text("end"), 
new Text("begin"));
+     MergeInfo mergeInfo = new MergeInfo(keyExtent, Operation.MERGE);
+     MergeStats stats = new MergeStats(mergeInfo);
+     mergeInfo.setState(MergeState.WAITING_FOR_OFFLINE);
+ 
+     // Verify WAITING_FOR_OFFLINE does not throw an exception
+     stats.verifyState(mergeInfo, MergeState.WAITING_FOR_OFFLINE);
+ 
+     // State is wrong so should throw exception
 -    mergeInfo.setState(MergeState.WAITING_FOR_CHOPPED);
++    mergeInfo.setState(MergeState.MERGING);
+     assertThrows(IllegalStateException.class,
+         () -> stats.verifyState(mergeInfo, MergeState.WAITING_FOR_OFFLINE));
+   }
+ 
+   @Test
+   public void testVerifyWalogs() throws BadLocationStateException {
+     KeyExtent keyExtent = new KeyExtent(TableId.of("table"), new Text("end"), 
new Text("begin"));
+     MergeStats stats = new MergeStats(new MergeInfo(keyExtent, 
Operation.MERGE));
+ 
+     // Verify that if there are Walogs the return true, else false
+     assertTrue(stats.verifyWalogs(getState(keyExtent, List.of())));
+     assertFalse(stats.verifyWalogs(getState(keyExtent, 
List.of(List.of("log1")))));
+   }
+ 
+   private TabletLocationState getState(KeyExtent keyExtent, 
Collection<Collection<String>> walogs)
+       throws BadLocationStateException {
 -    return new TabletLocationState(keyExtent, null, null, null, null, walogs, 
true);
++    return new TabletLocationState(keyExtent, null, null, null, null, walogs);
+   }
+ 
+ }
diff --cc test/src/main/java/org/apache/accumulo/test/manager/MergeStateIT.java
index 003b387f1b,21173c3c5e..1a6252b75e
--- a/test/src/main/java/org/apache/accumulo/test/manager/MergeStateIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/manager/MergeStateIT.java
@@@ -45,6 -46,8 +45,7 @@@ import org.apache.accumulo.core.metadat
  import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location;
  import org.apache.accumulo.core.security.Authorizations;
  import org.apache.accumulo.core.security.TablePermission;
+ import org.apache.accumulo.core.tabletserver.log.LogEntry;
 -import org.apache.accumulo.core.util.HostAndPort;
  import org.apache.accumulo.manager.state.MergeStats;
  import org.apache.accumulo.server.ServerContext;
  import org.apache.accumulo.server.manager.state.Assignment;
@@@ -195,8 -206,26 +196,26 @@@ public class MergeStateIT extends Confi
        m = TabletColumnFamily.createPrevRowMutation(tablet);
        Collection<Collection<String>> walogs = Collections.emptyList();
        metaDataStateStore.unassign(Collections.singletonList(new 
TabletLocationState(tablet, null,
 -          Location.current(state.someTServer), null, null, walogs, false)), 
null);
 +          Location.current(state.someTServer), null, null, walogs)), null);
  
+       // Add a walog which should keep the state from transitioning to MERGING
+       KeyExtent ke = new KeyExtent(tableId, new Text("t"), new Text("p"));
+       m = new Mutation(ke.toMetaRow());
+       LogEntry logEntry = new LogEntry(ke, 100, "f1");
+       
m.at().family(logEntry.getColumnFamily()).qualifier(logEntry.getColumnQualifier())
+           .timestamp(logEntry.timestamp).put(logEntry.getValue());
+       update(accumuloClient, m);
+ 
+       // Verify state is still WAITING_FOR_OFFLINE
+       stats = scan(state, metaDataStateStore);
+       newState = stats.nextMergeState(accumuloClient, state);
+       assertEquals(MergeState.WAITING_FOR_OFFLINE, newState);
+ 
+       // Delete the walog which will now allow a transition to MERGING
+       m = new Mutation(ke.toMetaRow());
+       m.putDelete(logEntry.getColumnFamily(), logEntry.getColumnQualifier(), 
logEntry.timestamp);
+       update(accumuloClient, m);
+ 
        // now we can split
        stats = scan(state, metaDataStateStore);
        assertEquals(MergeState.MERGING, stats.nextMergeState(accumuloClient, 
state));

Reply via email to