This is an automated email from the ASF dual-hosted git repository.

hkeebler pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/master by this push:
     new 840fe43  Fix #1365 2.1 Upgrade processing for #1043 ~del (#1366)
840fe43 is described below

commit 840fe4352badb5256151b3ef3a947ecd7a2fa49b
Author: hkeebler <49656678+hkeeb...@users.noreply.github.com>
AuthorDate: Fri Sep 27 10:56:17 2019 -0400

    Fix #1365 2.1 Upgrade processing for #1043 ~del (#1366)
    
    * Fix #1365 2.1 Upgrade processing for #1043 ~del
---
 .../accumulo/core/metadata/schema/Ample.java       |  17 +-
 .../core/metadata/schema/MetadataSchema.java       |   8 +
 .../accumulo/server/metadata/ServerAmpleImpl.java  |  23 +--
 .../master/upgrade/UpgradeCoordinator.java         |   4 +-
 .../accumulo/master/upgrade/Upgrader9to10.java     | 101 ++++++++++
 .../test/functional/GarbageCollectorIT.java        |  10 +-
 .../test/upgrade/GCUpgrade9to10TestIT.java         | 219 +++++++++++++++++++++
 7 files changed, 363 insertions(+), 19 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java 
b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
index 9b8a692..61fbf06 100644
--- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
+++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
@@ -65,12 +65,16 @@ public interface Ample {
    * derived from the table id.
    */
   public enum DataLevel {
-    ROOT(null), METADATA(RootTable.NAME), USER(MetadataTable.NAME);
+    ROOT(null, null),
+    METADATA(RootTable.NAME, RootTable.ID),
+    USER(MetadataTable.NAME, MetadataTable.ID);
 
     private final String table;
+    private final TableId id;
 
-    private DataLevel(String table) {
+    private DataLevel(String table, TableId id) {
       this.table = table;
+      this.id = id;
     }
 
     /**
@@ -81,6 +85,15 @@ public interface Ample {
         throw new UnsupportedOperationException();
       return table;
     }
+
+    /**
+     * @return The Id of the Accumulo table in which this data level stores 
its metadata.
+     */
+    public TableId tableId() {
+      if (id == null)
+        throw new UnsupportedOperationException();
+      return id;
+    }
   }
 
   /**
diff --git 
a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
 
b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
index b2fdede..1a88785 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
@@ -270,6 +270,14 @@ public class MetadataSchema {
       return row.substring(encoded_prefix_length);
     }
 
+    /**
+     * Value to indicate that the row has been skewed/encoded.
+     */
+    public static class SkewedKeyValue {
+      public static final String STR_NAME = "skewed";
+      public static final Value NAME = new Value(STR_NAME);
+    }
+
   }
 
   /**
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java
 
b/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java
index df59341..dc8af4f 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java
@@ -25,6 +25,7 @@ import java.util.Collection;
 import java.util.Iterator;
 import java.util.function.Consumer;
 import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
 
 import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.MutationsRejectedException;
@@ -35,13 +36,12 @@ import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.PartialKey;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.TableId;
-import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.RootTable;
 import org.apache.accumulo.core.metadata.schema.Ample;
 import org.apache.accumulo.core.metadata.schema.AmpleImpl;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.DeletesSection;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.fate.zookeeper.ZooUtil;
 import org.apache.accumulo.server.ServerContext;
@@ -51,7 +51,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Iterators;
 
 public class ServerAmpleImpl extends AmpleImpl implements Ample {
 
@@ -134,7 +133,7 @@ public class ServerAmpleImpl extends AmpleImpl implements 
Ample {
 
     try (BatchWriter writer = context.createBatchWriter(level.metaTable())) {
       for (String path : paths) {
-        Mutation m = new 
Mutation(MetadataSchema.DeletesSection.encodeRow(path));
+        Mutation m = new Mutation(DeletesSection.encodeRow(path));
         m.putDelete(EMPTY_TEXT, EMPTY_TEXT);
         writer.addMutation(m);
       }
@@ -155,9 +154,9 @@ public class ServerAmpleImpl extends AmpleImpl implements 
Ample {
 
       return candidates.iterator();
     } else if (level == DataLevel.METADATA || level == DataLevel.USER) {
-      Range range = MetadataSchema.DeletesSection.getRange();
+      Range range = DeletesSection.getRange();
       if (continuePoint != null && !continuePoint.isEmpty()) {
-        String continueRow = 
MetadataSchema.DeletesSection.encodeRow(continuePoint);
+        String continueRow = DeletesSection.encodeRow(continuePoint);
         range = new Range(new Key(continueRow).followingKey(PartialKey.ROW), 
true,
             range.getEndKey(), range.isEndKeyInclusive());
       }
@@ -169,10 +168,9 @@ public class ServerAmpleImpl extends AmpleImpl implements 
Ample {
         throw new RuntimeException(e);
       }
       scanner.setRange(range);
-
-      return Iterators.transform(scanner.iterator(),
-          entry -> 
MetadataSchema.DeletesSection.decodeRow(entry.getKey().getRow().toString()));
-
+      return StreamSupport.stream(scanner.spliterator(), false)
+          .filter(entry -> 
entry.getValue().equals(DeletesSection.SkewedKeyValue.NAME))
+          .map(entry -> 
DeletesSection.decodeRow(entry.getKey().getRow().toString())).iterator();
     } else {
       throw new IllegalArgumentException();
     }
@@ -196,9 +194,8 @@ public class ServerAmpleImpl extends AmpleImpl implements 
Ample {
   public static Mutation createDeleteMutation(ServerContext context, TableId 
tableId,
       String pathToRemove) {
     Path path = context.getVolumeManager().getFullPath(tableId, pathToRemove);
-    Mutation delFlag =
-        new Mutation(new 
Text(MetadataSchema.DeletesSection.encodeRow(path.toString())));
-    delFlag.put(EMPTY_TEXT, EMPTY_TEXT, new Value(new byte[] {}));
+    Mutation delFlag = new Mutation(new 
Text(DeletesSection.encodeRow(path.toString())));
+    delFlag.put(EMPTY_TEXT, EMPTY_TEXT, DeletesSection.SkewedKeyValue.NAME);
     return delFlag;
   }
 
diff --git 
a/server/master/src/main/java/org/apache/accumulo/master/upgrade/UpgradeCoordinator.java
 
b/server/master/src/main/java/org/apache/accumulo/master/upgrade/UpgradeCoordinator.java
index 18ca7b9..38d967d 100644
--- 
a/server/master/src/main/java/org/apache/accumulo/master/upgrade/UpgradeCoordinator.java
+++ 
b/server/master/src/main/java/org/apache/accumulo/master/upgrade/UpgradeCoordinator.java
@@ -38,8 +38,8 @@ public class UpgradeCoordinator {
   private boolean haveUpgradedZooKeeper = false;
   private boolean startedMetadataUpgrade = false;
   private int currentVersion;
-  private Map<Integer,Upgrader> upgraders =
-      Map.of(ServerConstants.SHORTEN_RFILE_KEYS, new Upgrader8to9());
+  private Map<Integer,Upgrader> upgraders = 
Map.of(ServerConstants.SHORTEN_RFILE_KEYS,
+      new Upgrader8to9(), ServerConstants.CRYPTO_CHANGES, new Upgrader9to10());
 
   public UpgradeCoordinator(ServerContext ctx) {
     int currentVersion = 
ServerUtil.getAccumuloPersistentVersion(ctx.getVolumeManager());
diff --git 
a/server/master/src/main/java/org/apache/accumulo/master/upgrade/Upgrader9to10.java
 
b/server/master/src/main/java/org/apache/accumulo/master/upgrade/Upgrader9to10.java
index e2ce6c4..c349f8d 100644
--- 
a/server/master/src/main/java/org/apache/accumulo/master/upgrade/Upgrader9to10.java
+++ 
b/server/master/src/main/java/org/apache/accumulo/master/upgrade/Upgrader9to10.java
@@ -20,6 +20,8 @@ package org.apache.accumulo.master.upgrade;
 import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.accumulo.core.metadata.RootTable.ZROOT_TABLET;
 import static 
org.apache.accumulo.core.metadata.RootTable.ZROOT_TABLET_GC_CANDIDATES;
+import static 
org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly;
+import static org.apache.accumulo.server.util.MetadataTableUtil.EMPTY_TEXT;
 
 import java.io.IOException;
 import java.io.UncheckedIOException;
@@ -27,19 +29,33 @@ import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.StreamSupport;
 
 import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.client.admin.TimeType;
 import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.file.FileOperations;
 import org.apache.accumulo.core.file.FileSKVIterator;
 import org.apache.accumulo.core.metadata.RootTable;
+import org.apache.accumulo.core.metadata.schema.Ample;
 import org.apache.accumulo.core.metadata.schema.DataFileValue;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema;
 import org.apache.accumulo.core.metadata.schema.MetadataTime;
 import org.apache.accumulo.core.metadata.schema.RootTabletMetadata;
 import org.apache.accumulo.core.metadata.schema.TabletMetadata.LocationType;
+import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.core.tabletserver.log.LogEntry;
 import org.apache.accumulo.core.util.HostAndPort;
 import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
@@ -51,6 +67,7 @@ import org.apache.accumulo.server.fs.FileRef;
 import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.master.state.TServerInstance;
 import org.apache.accumulo.server.metadata.RootGcCandidates;
+import org.apache.accumulo.server.metadata.ServerAmpleImpl;
 import org.apache.accumulo.server.metadata.TabletMutatorBase;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -74,6 +91,14 @@ public class Upgrader9to10 implements Upgrader {
   public static final String ZROOT_TABLET_WALOGS = ZROOT_TABLET + "/walogs";
   public static final String ZROOT_TABLET_CURRENT_LOGS = ZROOT_TABLET + 
"/current_logs";
   public static final String ZROOT_TABLET_PATH = ZROOT_TABLET + "/dir";
+  public static final Value UPGRADED = 
MetadataSchema.DeletesSection.SkewedKeyValue.NAME;
+  public static final String OLD_DELETE_PREFIX = "~del";
+
+  /**
+   * This percentage was taken from the SimpleGarbageCollector and if nothing 
else is going on
+   * during upgrade then it could be larger.
+   */
+  static final float CANDIDATE_MEMORY_PERCENTAGE = 0.50f;
 
   @Override
   public void upgradeZookeeper(ServerContext ctx) {
@@ -82,6 +107,8 @@ public class Upgrader9to10 implements Upgrader {
 
   @Override
   public void upgradeMetadata(ServerContext ctx) {
+    upgradeFileDeletes(ctx, Ample.DataLevel.METADATA);
+    upgradeFileDeletes(ctx, Ample.DataLevel.USER);
 
   }
 
@@ -352,4 +379,78 @@ public class Upgrader9to10 implements Upgrader {
     }
   }
 
+  public void upgradeFileDeletes(ServerContext ctx, Ample.DataLevel level) {
+
+    String tableName = level.metaTable();
+    AccumuloClient c = ctx;
+
+    // find all deletes
+    try (BatchWriter writer = c.createBatchWriter(tableName, new 
BatchWriterConfig())) {
+      log.info("looking for candidates in table {}", tableName);
+      Iterator<String> oldCandidates = getOldCandidates(ctx, tableName);
+      int t = 0; // no waiting first time through
+      while (oldCandidates.hasNext()) {
+        // give it some time for memory to clean itself up if needed
+        sleepUninterruptibly(t, TimeUnit.SECONDS);
+        List<String> deletes = readCandidatesThatFitInMemory(oldCandidates);
+        log.info("found {} deletes to upgrade", deletes.size());
+        for (String olddelete : deletes) {
+          // create new formatted delete
+          log.trace("upgrading delete entry for {}", olddelete);
+          writer.addMutation(ServerAmpleImpl.createDeleteMutation(ctx, 
level.tableId(), olddelete));
+        }
+        writer.flush();
+        // if nothing thrown then we're good so mark all deleted
+        log.info("upgrade processing completed so delete old entries");
+        for (String olddelete : deletes) {
+          log.trace("deleting old entry for {}", olddelete);
+          writer.addMutation(deleteOldDeleteMutation(olddelete));
+        }
+        writer.flush();
+        t = 3;
+      }
+    } catch (TableNotFoundException | MutationsRejectedException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private Iterator<String> getOldCandidates(ServerContext ctx, String 
tableName)
+      throws TableNotFoundException {
+    Range range = MetadataSchema.DeletesSection.getRange();
+    Scanner scanner = ctx.createScanner(tableName, Authorizations.EMPTY);
+    scanner.setRange(range);
+    return StreamSupport.stream(scanner.spliterator(), false)
+        .filter(entry -> !entry.getValue().equals(UPGRADED))
+        .map(entry -> 
entry.getKey().getRow().toString().substring(OLD_DELETE_PREFIX.length()))
+        .iterator();
+  }
+
+  private List<String> readCandidatesThatFitInMemory(Iterator<String> 
candidates) {
+    List<String> result = new ArrayList<>();
+    // Always read at least one. If memory doesn't clean up fast enough at 
least
+    // some progress is made.
+    while (candidates.hasNext()) {
+      result.add(candidates.next());
+      if (almostOutOfMemory(Runtime.getRuntime()))
+        break;
+    }
+    return result;
+  }
+
+  private Mutation deleteOldDeleteMutation(final String delete) {
+    Mutation m = new Mutation(OLD_DELETE_PREFIX + delete);
+    m.putDelete(EMPTY_TEXT, EMPTY_TEXT);
+    return m;
+  }
+
+  private boolean almostOutOfMemory(Runtime runtime) {
+    if (runtime.totalMemory() - runtime.freeMemory()
+        > CANDIDATE_MEMORY_PERCENTAGE * runtime.maxMemory()) {
+      log.info("List of delete candidates has exceeded the memory"
+          + " threshold. Attempting to delete what has been gathered so far.");
+      return true;
+    } else
+      return false;
+  }
+
 }
diff --git 
a/test/src/main/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java
 
b/test/src/main/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java
index d28eaaa..ae9936f 100644
--- 
a/test/src/main/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/functional/GarbageCollectorIT.java
@@ -57,6 +57,7 @@ import 
org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl.ProcessInfo;
 import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
 import org.apache.accumulo.miniclusterImpl.ProcessNotFoundException;
 import org.apache.accumulo.miniclusterImpl.ProcessReference;
+import org.apache.accumulo.server.metadata.ServerAmpleImpl;
 import org.apache.accumulo.test.TestIngest;
 import org.apache.accumulo.test.VerifyIngest;
 import org.apache.accumulo.test.VerifyIngest.VerifyParams;
@@ -213,7 +214,11 @@ public class GarbageCollectorIT extends 
ConfigurableMacBase {
       try (BatchWriter bw = c.createBatchWriter(MetadataTable.NAME)) {
         bw.addMutation(createDelMutation("", "", "", ""));
         bw.addMutation(createDelMutation("", "testDel", "test", "valueTest"));
-        bw.addMutation(createDelMutation("/", "", "", ""));
+        // path is invalid but value is expected - only way the invalid entry 
will come through
+        // processing and
+        // show up to produce error in output to allow while loop to end
+        bw.addMutation(
+            createDelMutation("/", "", "", 
MetadataSchema.DeletesSection.SkewedKeyValue.STR_NAME));
       }
 
       ProcessInfo gc = cluster.exec(SimpleGarbageCollector.class);
@@ -304,7 +309,8 @@ public class GarbageCollectorIT extends ConfigurableMacBase 
{
       for (int i = 0; i < 100000; ++i) {
         String longpath = "aaaaaaaaaabbbbbbbbbbccccccccccddddddddddeeeeeeeeee"
             + "ffffffffffgggggggggghhhhhhhhhhiiiiiiiiiijjjjjjjjjj";
-        Mutation delFlag = createDelMutation(String.format("/%020d/%s", i, 
longpath), "", "", "");
+        Mutation delFlag = 
ServerAmpleImpl.createDeleteMutation(getServerContext(),
+            MetadataTable.ID, String.format("/%020d/%s", i, longpath));
         bw.addMutation(delFlag);
       }
     }
diff --git 
a/test/src/main/java/org/apache/accumulo/test/upgrade/GCUpgrade9to10TestIT.java 
b/test/src/main/java/org/apache/accumulo/test/upgrade/GCUpgrade9to10TestIT.java
new file mode 100644
index 0000000..b4b231a
--- /dev/null
+++ 
b/test/src/main/java/org/apache/accumulo/test/upgrade/GCUpgrade9to10TestIT.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.test.upgrade;
+
+import static 
org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.Constants;
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.metadata.schema.Ample;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.accumulo.core.security.TablePermission;
+import org.apache.accumulo.fate.zookeeper.ZooLock;
+import org.apache.accumulo.fate.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.master.upgrade.Upgrader9to10;
+import org.apache.accumulo.minicluster.MemoryUnit;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.miniclusterImpl.ProcessNotFoundException;
+import org.apache.accumulo.test.functional.ConfigurableMacBase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.io.Text;
+import org.apache.zookeeper.KeeperException;
+import org.junit.Test;
+
+import com.google.common.collect.Iterators;
+
+public class GCUpgrade9to10TestIT extends ConfigurableMacBase {
+  private static final String OUR_SECRET = "itsreallysecret";
+  private static final String OLDDELPREFIX = "~del";
+  private static final Upgrader9to10 upgrader = new Upgrader9to10();
+
+  @Override
+  public int defaultTimeoutSeconds() {
+    return 5 * 60;
+  }
+
+  @Override
+  public void configure(MiniAccumuloConfigImpl cfg, Configuration 
hadoopCoreSite) {
+    cfg.setProperty(Property.INSTANCE_ZK_TIMEOUT, "15s");
+    cfg.setProperty(Property.INSTANCE_SECRET, OUR_SECRET);
+    cfg.setDefaultMemory(64, MemoryUnit.MEGABYTE);
+    cfg.setMemory(ServerType.MASTER, 16, MemoryUnit.MEGABYTE);
+    cfg.setMemory(ServerType.ZOOKEEPER, 32, MemoryUnit.MEGABYTE);
+    cfg.setProperty(Property.GC_CYCLE_START, "1");
+    cfg.setProperty(Property.GC_CYCLE_DELAY, "1");
+    cfg.setProperty(Property.GC_PORT, "0");
+    cfg.setProperty(Property.TSERV_MAXMEM, "5K");
+    cfg.setProperty(Property.TSERV_MAJC_DELAY, "1");
+
+    // use raw local file system so walogs sync and flush will work
+    hadoopCoreSite.set("fs.file.impl", RawLocalFileSystem.class.getName());
+  }
+
+  private void killMacGc() throws ProcessNotFoundException, 
InterruptedException, KeeperException {
+    // kill gc started by MAC
+    getCluster().killProcess(ServerType.GARBAGE_COLLECTOR,
+        
getCluster().getProcesses().get(ServerType.GARBAGE_COLLECTOR).iterator().next());
+    // delete lock in zookeeper if there, this will allow next GC to start 
quickly
+    String path = getServerContext().getZooKeeperRoot() + Constants.ZGC_LOCK;
+    ZooReaderWriter zk = new ZooReaderWriter(cluster.getZooKeepers(), 30000, 
OUR_SECRET);
+    try {
+      ZooLock.deleteLock(zk, path);
+    } catch (IllegalStateException e) {
+      log.error("Unable to delete ZooLock for mini accumulo-gc", e);
+    }
+
+    assertNull(getCluster().getProcesses().get(ServerType.GARBAGE_COLLECTOR));
+  }
+
+  @Test
+  public void gcUpgradeRootTableDeletesIT() throws Exception {
+    gcUpgradeDeletesTest(Ample.DataLevel.METADATA, 3);
+  }
+
+  @Test
+  public void gcUpgradeMetadataTableDeletesIT() throws Exception {
+    gcUpgradeDeletesTest(Ample.DataLevel.USER, 3);
+  }
+
+  @Test
+  public void gcUpgradeNoDeletesIT() throws Exception {
+    gcUpgradeDeletesTest(Ample.DataLevel.METADATA, 0);
+
+  }
+
+  /**
+   * This is really hard to make happen - the minicluster can only use so 
little memory to start up.
+   * The {@link org.apache.accumulo.master.upgrade.Upgrader9to10} 
CANDIDATE_MEMORY_PERCENTAGE can be
+   * adjusted.
+   */
+  @Test
+  public void gcUpgradeOutofMemoryTest() throws Exception {
+    killMacGc(); // we do not want anything deleted
+
+    int somebignumber = 100000;
+    String longpathname = "aaaaaaaaaabbbbbbbbbbccccccccccddddddddddeeeeeeeeee"
+        + "ffffffffffgggggggggghhhhhhhhhhiiiiiiiiiijjjjjjjjjj"
+        + 
"kkkkkkkkkkkkkkkkkklllllllllllllllllllllmmmmmmmmmmmmmmmmmnnnnnnnnnnnnnnnn";
+    longpathname += longpathname; // make it even longer
+    Ample.DataLevel level = Ample.DataLevel.USER;
+
+    log.info("Filling metadata table with lots of bogus delete flags");
+    try (AccumuloClient c = 
Accumulo.newClient().from(getClientProperties()).build()) {
+      addEntries(c, level.metaTable(), somebignumber, longpathname);
+
+      sleepUninterruptibly(1, TimeUnit.SECONDS);
+      upgrader.upgradeFileDeletes(getServerContext(), level);
+
+      sleepUninterruptibly(1, TimeUnit.SECONDS);
+      Range range = MetadataSchema.DeletesSection.getRange();
+      Scanner scanner;
+      try {
+        scanner = c.createScanner(level.metaTable(), Authorizations.EMPTY);
+      } catch (TableNotFoundException e) {
+        throw new RuntimeException(e);
+      }
+      scanner.setRange(range);
+      assertEquals(somebignumber, Iterators.size(scanner.iterator()));
+    }
+  }
+
+  private void gcUpgradeDeletesTest(Ample.DataLevel level, int count) throws 
Exception {
+    killMacGc();// we do not want anything deleted
+
+    log.info("Testing delete upgrades for {}", level.metaTable());
+    try (AccumuloClient c = 
Accumulo.newClient().from(getClientProperties()).build()) {
+
+      Map<String,String> expected = addEntries(c, level.metaTable(), count, 
"somefile");
+      Map<String,String> actual = new HashMap<>();
+
+      sleepUninterruptibly(1, TimeUnit.SECONDS);
+      upgrader.upgradeFileDeletes(getServerContext(), level);
+      sleepUninterruptibly(1, TimeUnit.SECONDS);
+      Range range = MetadataSchema.DeletesSection.getRange();
+
+      Scanner scanner;
+      try {
+        scanner = c.createScanner(level.metaTable(), Authorizations.EMPTY);
+      } catch (TableNotFoundException e) {
+        throw new RuntimeException(e);
+      }
+      scanner.setRange(range);
+      scanner.iterator().forEachRemaining(entry -> {
+        actual.put(entry.getKey().getRow().toString(), 
entry.getValue().toString());
+      });
+
+      assertEquals(expected, actual);
+
+      // ENSURE IDEMPOTENCE - run upgrade again to ensure nothing is changed 
because there is
+      // nothing to change
+      upgrader.upgradeFileDeletes(getServerContext(), level);
+      try {
+        scanner = c.createScanner(level.metaTable(), Authorizations.EMPTY);
+      } catch (TableNotFoundException e) {
+        throw new RuntimeException(e);
+      }
+      scanner.setRange(range);
+      actual.clear();
+      scanner.iterator().forEachRemaining(entry -> {
+        actual.put(entry.getKey().getRow().toString(), 
entry.getValue().toString());
+      });
+      assertEquals(expected, actual);
+    }
+  }
+
+  private Mutation createOldDelMutation(String path, String cf, String cq, 
String val) {
+    Text row = new Text(OLDDELPREFIX + path);
+    Mutation delFlag = new Mutation(row);
+    delFlag.put(cf, cq, val);
+    return delFlag;
+  }
+
+  private Map<String,String> addEntries(AccumuloClient client, String table, 
int count,
+      String filename) throws Exception {
+    client.securityOperations().grantTablePermission(client.whoami(), table, 
TablePermission.WRITE);
+    Map<String,String> expected = new TreeMap<>();
+    try (BatchWriter bw = client.createBatchWriter(table)) {
+      for (int i = 0; i < count; ++i) {
+        String longpath = String.format("hdfs://localhost:8020/%020d/%s", i, 
filename);
+        Mutation delFlag = createOldDelMutation(longpath, "", "", "");
+        bw.addMutation(delFlag);
+        expected.put(MetadataSchema.DeletesSection.encodeRow(longpath),
+            Upgrader9to10.UPGRADED.toString());
+      }
+      return expected;
+    }
+  }
+
+}

Reply via email to