Merge branch '1.7' into 1.8

Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/bf5b6e0f
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/bf5b6e0f
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/bf5b6e0f

Branch: refs/heads/1.8
Commit: bf5b6e0fae4974bb77b6409c21c5770d815b991f
Parents: 8668c7f 7b9a11a
Author: Keith Turner <ktur...@apache.org>
Authored: Thu Jan 26 19:24:14 2017 -0500
Committer: Keith Turner <ktur...@apache.org>
Committed: Thu Jan 26 19:24:14 2017 -0500

----------------------------------------------------------------------
 .../accumulo/master/FateServiceHandler.java     |  15 +-
 .../master/tableOps/CancelCompactions.java      |  19 ++-
 .../master/tableOps/ChangeTableState.java       |  20 ++-
 .../accumulo/master/tableOps/CompactRange.java  |  30 ++--
 .../master/tableOps/CompactionDriver.java       |  14 +-
 .../accumulo/master/tableOps/DeleteTable.java   |  29 ++--
 .../accumulo/master/tableOps/RenameTable.java   |  19 ++-
 .../accumulo/master/tableOps/TableRangeOp.java  |  20 ++-
 .../master/tableOps/TableRangeOpWait.java       |   9 +-
 .../apache/accumulo/master/tableOps/Utils.java  |   9 +-
 .../functional/ConcurrentDeleteTableIT.java     | 167 ++++++++++++++++++-
 11 files changed, 262 insertions(+), 89 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/bf5b6e0f/server/master/src/main/java/org/apache/accumulo/master/FateServiceHandler.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/bf5b6e0f/server/master/src/main/java/org/apache/accumulo/master/tableOps/CompactRange.java
----------------------------------------------------------------------
diff --cc 
server/master/src/main/java/org/apache/accumulo/master/tableOps/CompactRange.java
index d7d5b14,e641479..f3d53a7
--- 
a/server/master/src/main/java/org/apache/accumulo/master/tableOps/CompactRange.java
+++ 
b/server/master/src/main/java/org/apache/accumulo/master/tableOps/CompactRange.java
@@@ -24,11 -24,10 +24,10 @@@ import java.util.List
  import org.apache.accumulo.core.Constants;
  import org.apache.accumulo.core.client.IteratorSetting;
  import org.apache.accumulo.core.client.admin.CompactionStrategyConfig;
 +import 
org.apache.accumulo.core.client.impl.AcceptableThriftTableOperationException;
  import org.apache.accumulo.core.client.impl.CompactionStrategyConfigUtil;
- import org.apache.accumulo.core.client.impl.Tables;
  import org.apache.accumulo.core.client.impl.thrift.TableOperation;
  import 
org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType;
 -import 
org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException;
  import org.apache.accumulo.fate.Repo;
  import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
  import org.apache.accumulo.fate.zookeeper.IZooReaderWriter.Mutator;
@@@ -51,9 -51,14 +51,14 @@@ public class CompactRange extends Maste
    private byte[] endRow;
    private byte[] config;
  
-   public CompactRange(String tableId, byte[] startRow, byte[] endRow, 
List<IteratorSetting> iterators, CompactionStrategyConfig compactionStrategy)
-       throws AcceptableThriftTableOperationException {
+   private String getNamespaceId(Master env) throws Exception {
+     return Utils.getNamespaceId(env.getInstance(), tableId, 
TableOperation.COMPACT, this.namespaceId);
+   }
+ 
+   public CompactRange(String namespaceId, String tableId, byte[] startRow, 
byte[] endRow, List<IteratorSetting> iterators,
 -      CompactionStrategyConfig compactionStrategy) throws 
ThriftTableOperationException {
++      CompactionStrategyConfig compactionStrategy) throws 
AcceptableThriftTableOperationException {
  
+     requireNonNull(namespaceId, "Invalid argument: null namespaceId");
      requireNonNull(tableId, "Invalid argument: null tableId");
      requireNonNull(iterators, "Invalid argument: null iterator list");
      requireNonNull(compactionStrategy, "Invalid argument: null 
compactionStrategy");
@@@ -122,9 -127,9 +127,9 @@@
          }
        });
  
-       return new CompactionDriver(Long.parseLong(new String(cid, 
UTF_8).split(",")[0]), tableId, startRow, endRow);
+       return new CompactionDriver(Long.parseLong(new String(cid, 
UTF_8).split(",")[0]), getNamespaceId(env), tableId, startRow, endRow);
      } catch (NoNodeException nne) {
 -      throw new ThriftTableOperationException(tableId, null, 
TableOperation.COMPACT, TableOperationExceptionType.NOTFOUND, null);
 +      throw new AcceptableThriftTableOperationException(tableId, null, 
TableOperation.COMPACT, TableOperationExceptionType.NOTFOUND, null);
      }
  
    }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/bf5b6e0f/server/master/src/main/java/org/apache/accumulo/master/tableOps/CompactionDriver.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/bf5b6e0f/server/master/src/main/java/org/apache/accumulo/master/tableOps/RenameTable.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/bf5b6e0f/server/master/src/main/java/org/apache/accumulo/master/tableOps/TableRangeOp.java
----------------------------------------------------------------------
diff --cc 
server/master/src/main/java/org/apache/accumulo/master/tableOps/TableRangeOp.java
index e2e7018,64d08be..d91755d
--- 
a/server/master/src/main/java/org/apache/accumulo/master/tableOps/TableRangeOp.java
+++ 
b/server/master/src/main/java/org/apache/accumulo/master/tableOps/TableRangeOp.java
@@@ -16,10 -16,9 +16,9 @@@
   */
  package org.apache.accumulo.master.tableOps;
  
 +import 
org.apache.accumulo.core.client.impl.AcceptableThriftTableOperationException;
- import org.apache.accumulo.core.client.impl.Tables;
  import org.apache.accumulo.core.client.impl.thrift.TableOperation;
  import 
org.apache.accumulo.core.client.impl.thrift.TableOperationExceptionType;
 -import 
org.apache.accumulo.core.client.impl.thrift.ThriftTableOperationException;
  import org.apache.accumulo.core.data.impl.KeyExtent;
  import org.apache.accumulo.core.metadata.RootTable;
  import org.apache.accumulo.core.util.TextUtil;
@@@ -42,15 -42,19 +42,19 @@@ public class TableRangeOp extends Maste
    private byte[] endRow;
    private Operation op;
  
-   @Override
-   public long isReady(long tid, Master environment) throws Exception {
-     String namespaceId = Tables.getNamespaceId(environment.getInstance(), 
tableId);
-     return Utils.reserveNamespace(namespaceId, tid, false, true, 
TableOperation.MERGE) + Utils.reserveTable(tableId, tid, true, true, 
TableOperation.MERGE);
+   private String getNamespaceId(Master env) throws Exception {
+     return Utils.getNamespaceId(env.getInstance(), tableId, 
TableOperation.MERGE, this.namespaceId);
    }
  
-   public TableRangeOp(MergeInfo.Operation op, String tableId, Text startRow, 
Text endRow) throws AcceptableThriftTableOperationException {
+   @Override
+   public long isReady(long tid, Master env) throws Exception {
+     return Utils.reserveNamespace(getNamespaceId(env), tid, false, true, 
TableOperation.MERGE)
+         + Utils.reserveTable(tableId, tid, true, true, TableOperation.MERGE);
+   }
  
 -  public TableRangeOp(MergeInfo.Operation op, String namespaceId, String 
tableId, Text startRow, Text endRow) throws ThriftTableOperationException {
++  public TableRangeOp(MergeInfo.Operation op, String namespaceId, String 
tableId, Text startRow, Text endRow) throws 
AcceptableThriftTableOperationException {
      this.tableId = tableId;
+     this.namespaceId = namespaceId;
      this.startRow = TextUtil.getBytes(startRow);
      this.endRow = TextUtil.getBytes(endRow);
      this.op = op;
@@@ -85,14 -90,14 +89,14 @@@
  
    @Override
    public void undo(long tid, Master env) throws Exception {
-     String namespaceId = Tables.getNamespaceId(env.getInstance(), tableId);
      // Not sure this is a good thing to do. The Master state engine should be 
the one to remove it.
 -    Text tableIdText = new Text(tableId);
 -    MergeInfo mergeInfo = env.getMergeInfo(tableIdText);
 +    MergeInfo mergeInfo = env.getMergeInfo(tableId);
      if (mergeInfo.getState() != MergeState.NONE)
        log.info("removing merge information " + mergeInfo);
 -    env.clearMergeState(tableIdText);
 +    env.clearMergeState(tableId);
 +    Utils.unreserveNamespace(namespaceId, tid, false);
      Utils.unreserveTable(tableId, tid, true);
+     Utils.unreserveNamespace(getNamespaceId(env), tid, false);
    }
  
  }

http://git-wip-us.apache.org/repos/asf/accumulo/blob/bf5b6e0f/server/master/src/main/java/org/apache/accumulo/master/tableOps/TableRangeOpWait.java
----------------------------------------------------------------------
diff --cc 
server/master/src/main/java/org/apache/accumulo/master/tableOps/TableRangeOpWait.java
index a7c82b1,5feb06d..1194c67
--- 
a/server/master/src/main/java/org/apache/accumulo/master/tableOps/TableRangeOpWait.java
+++ 
b/server/master/src/main/java/org/apache/accumulo/master/tableOps/TableRangeOpWait.java
@@@ -57,12 -61,12 +59,11 @@@ class TableRangeOpWait extends MasterRe
  
    @Override
    public Repo<Master> call(long tid, Master master) throws Exception {
-     String namespaceId = Tables.getNamespaceId(master.getInstance(), tableId);
 -    Text tableIdText = new Text(tableId);
 -    MergeInfo mergeInfo = master.getMergeInfo(tableIdText);
 +    MergeInfo mergeInfo = master.getMergeInfo(tableId);
      log.info("removing merge information " + mergeInfo);
 -    master.clearMergeState(tableIdText);
 +    master.clearMergeState(tableId);
-     Utils.unreserveNamespace(namespaceId, tid, false);
      Utils.unreserveTable(tableId, tid, true);
+     Utils.unreserveNamespace(Utils.getNamespaceId(master.getInstance(), 
tableId, TableOperation.MERGE, this.namespaceId), tid, false);
      return null;
    }
  

http://git-wip-us.apache.org/repos/asf/accumulo/blob/bf5b6e0f/server/master/src/main/java/org/apache/accumulo/master/tableOps/Utils.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/accumulo/blob/bf5b6e0f/test/src/main/java/org/apache/accumulo/test/functional/ConcurrentDeleteTableIT.java
----------------------------------------------------------------------
diff --cc 
test/src/main/java/org/apache/accumulo/test/functional/ConcurrentDeleteTableIT.java
index 3f7a305,0000000..5808804
mode 100644,000000..100644
--- 
a/test/src/main/java/org/apache/accumulo/test/functional/ConcurrentDeleteTableIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/functional/ConcurrentDeleteTableIT.java
@@@ -1,147 -1,0 +1,298 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.accumulo.test.functional;
 +
 +import java.util.ArrayList;
++import java.util.Collections;
 +import java.util.List;
++import java.util.Map;
 +import java.util.Random;
++import java.util.Set;
 +import java.util.TreeSet;
 +import java.util.concurrent.CountDownLatch;
 +import java.util.concurrent.ExecutorService;
 +import java.util.concurrent.Executors;
 +import java.util.concurrent.Future;
 +
 +import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.client.AccumuloException;
 +import org.apache.accumulo.core.client.AccumuloSecurityException;
 +import org.apache.accumulo.core.client.BatchWriter;
 +import org.apache.accumulo.core.client.BatchWriterConfig;
 +import org.apache.accumulo.core.client.Connector;
 +import org.apache.accumulo.core.client.Instance;
 +import org.apache.accumulo.core.client.MutationsRejectedException;
 +import org.apache.accumulo.core.client.TableNotFoundException;
++import org.apache.accumulo.core.client.TableOfflineException;
++import org.apache.accumulo.core.client.admin.CompactionConfig;
 +import org.apache.accumulo.core.conf.Property;
 +import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.security.Authorizations;
 +import org.apache.accumulo.core.zookeeper.ZooUtil;
 +import org.apache.accumulo.fate.AdminUtil;
 +import org.apache.accumulo.fate.AdminUtil.FateStatus;
 +import org.apache.accumulo.fate.ZooStore;
 +import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
 +import org.apache.accumulo.harness.AccumuloClusterHarness;
 +import org.apache.accumulo.server.zookeeper.ZooReaderWriterFactory;
 +import org.apache.hadoop.io.Text;
 +import org.apache.zookeeper.KeeperException;
 +import org.junit.Assert;
 +import org.junit.Test;
 +
 +public class ConcurrentDeleteTableIT extends AccumuloClusterHarness {
 +
 +  @Test
 +  public void testConcurrentDeleteTablesOps() throws Exception {
 +    final Connector c = getConnector();
 +    String[] tables = getUniqueNames(2);
 +
-     TreeSet<Text> splits = new TreeSet<>();
- 
-     for (int i = 0; i < 1000; i++) {
-       Text split = new Text(String.format("%09x", i * 100000));
-       splits.add(split);
-     }
++    TreeSet<Text> splits = createSplits();
 +
 +    ExecutorService es = Executors.newFixedThreadPool(20);
 +
 +    int count = 0;
 +    for (final String table : tables) {
 +      c.tableOperations().create(table);
 +      c.tableOperations().addSplits(table, splits);
 +      writeData(c, table);
 +      if (count == 1) {
 +        c.tableOperations().flush(table, null, null, true);
 +      }
 +      count++;
 +
-       final CountDownLatch cdl = new CountDownLatch(20);
++      int numDeleteOps = 20;
++      final CountDownLatch cdl = new CountDownLatch(numDeleteOps);
 +
 +      List<Future<?>> futures = new ArrayList<>();
 +
-       for (int i = 0; i < 20; i++) {
++      for (int i = 0; i < numDeleteOps; i++) {
 +        Future<?> future = es.submit(new Runnable() {
 +
 +          @Override
 +          public void run() {
 +            try {
 +              cdl.countDown();
 +              cdl.await();
 +              c.tableOperations().delete(table);
 +            } catch (TableNotFoundException e) {
 +              // expected
 +            } catch (InterruptedException | AccumuloException | 
AccumuloSecurityException e) {
 +              throw new RuntimeException(e);
 +            }
 +          }
 +        });
 +
 +        futures.add(future);
 +      }
 +
 +      for (Future<?> future : futures) {
 +        future.get();
 +      }
 +
 +      try {
 +        c.createScanner(table, Authorizations.EMPTY);
 +        Assert.fail("Expected table " + table + " to be gone.");
 +      } catch (TableNotFoundException tnfe) {
 +        // expected
 +      }
 +
 +      FateStatus fateStatus = getFateStatus();
 +
 +      // ensure there are no dangling locks... before ACCUMULO-4575 was fixed 
concurrent delete tables could fail and leave dangling locks.
 +      Assert.assertEquals(0, fateStatus.getDanglingHeldLocks().size());
 +      Assert.assertEquals(0, fateStatus.getDanglingWaitingLocks().size());
 +    }
 +
 +    es.shutdown();
 +  }
 +
++  private TreeSet<Text> createSplits() {
++    TreeSet<Text> splits = new TreeSet<>();
++
++    for (int i = 0; i < 1000; i++) {
++      Text split = new Text(String.format("%09x", i * 100000));
++      splits.add(split);
++    }
++    return splits;
++  }
++
++  private static abstract class DelayedTableOp implements Runnable {
++    private CountDownLatch cdl;
++
++    DelayedTableOp(CountDownLatch cdl) {
++      this.cdl = cdl;
++    }
++
++    public void run() {
++      try {
++        cdl.countDown();
++        cdl.await();
++        Thread.sleep(10);
++        doTableOp();
++      } catch (TableNotFoundException e) {
++        // expected
++      } catch (RuntimeException e) {
++        throw e;
++      } catch (Exception e) {
++        throw new RuntimeException(e);
++      }
++    }
++
++    protected abstract void doTableOp() throws Exception;
++  }
++
++  @Test
++  public void testConcurrentFateOpsWithDelete() throws Exception {
++    final Connector c = getConnector();
++    String[] tables = getUniqueNames(2);
++
++    TreeSet<Text> splits = createSplits();
++
++    int numOperations = 8;
++
++    ExecutorService es = Executors.newFixedThreadPool(numOperations);
++
++    int count = 0;
++    for (final String table : tables) {
++      c.tableOperations().create(table);
++      c.tableOperations().addSplits(table, splits);
++      writeData(c, table);
++      if (count == 1) {
++        c.tableOperations().flush(table, null, null, true);
++      }
++      count++;
++
++      // increment this for each test
++      final CountDownLatch cdl = new CountDownLatch(numOperations);
++
++      List<Future<?>> futures = new ArrayList<>();
++
++      futures.add(es.submit(new Runnable() {
++        @Override
++        public void run() {
++          try {
++            cdl.countDown();
++            cdl.await();
++            c.tableOperations().delete(table);
++          } catch (TableNotFoundException | TableOfflineException e) {
++            // expected
++          } catch (InterruptedException | AccumuloException | 
AccumuloSecurityException e) {
++            throw new RuntimeException(e);
++          }
++        }
++      }));
++
++      futures.add(es.submit(new DelayedTableOp(cdl) {
++        @Override
++        protected void doTableOp() throws Exception {
++          c.tableOperations().compact(table, new CompactionConfig());
++        }
++      }));
++
++      futures.add(es.submit(new DelayedTableOp(cdl) {
++        @Override
++        protected void doTableOp() throws Exception {
++          c.tableOperations().merge(table, null, null);
++        }
++      }));
++
++      futures.add(es.submit(new DelayedTableOp(cdl) {
++        @Override
++        protected void doTableOp() throws Exception {
++          Map<String,String> m = Collections.emptyMap();
++          Set<String> s = Collections.emptySet();
++          c.tableOperations().clone(table, table + "_clone", true, m, s);
++        }
++      }));
++
++      futures.add(es.submit(new DelayedTableOp(cdl) {
++        @Override
++        protected void doTableOp() throws Exception {
++          c.tableOperations().deleteRows(table, null, null);
++        }
++      }));
++
++      futures.add(es.submit(new DelayedTableOp(cdl) {
++        @Override
++        protected void doTableOp() throws Exception {
++          c.tableOperations().cancelCompaction(table);
++        }
++      }));
++
++      futures.add(es.submit(new DelayedTableOp(cdl) {
++        @Override
++        protected void doTableOp() throws Exception {
++          c.tableOperations().rename(table, table + "_renamed");
++        }
++      }));
++
++      futures.add(es.submit(new DelayedTableOp(cdl) {
++        @Override
++        protected void doTableOp() throws Exception {
++          c.tableOperations().offline(table);
++        }
++      }));
++
++      Assert.assertEquals(numOperations, futures.size());
++
++      for (Future<?> future : futures) {
++        future.get();
++      }
++
++      try {
++        c.createScanner(table, Authorizations.EMPTY);
++        Assert.fail("Expected table " + table + " to be gone.");
++      } catch (TableNotFoundException tnfe) {
++        // expected
++      }
++
++      FateStatus fateStatus = getFateStatus();
++
++      // ensure there are no dangling locks... before ACCUMULO-4575 was fixed 
concurrent delete tables could fail and leave dangling locks.
++      Assert.assertEquals(0, fateStatus.getDanglingHeldLocks().size());
++      Assert.assertEquals(0, fateStatus.getDanglingWaitingLocks().size());
++    }
++
++    es.shutdown();
++  }
++
 +  private FateStatus getFateStatus() throws KeeperException, 
InterruptedException {
 +    Instance instance = getConnector().getInstance();
 +    AdminUtil<String> admin = new AdminUtil<>(false);
 +    String secret = 
getCluster().getSiteConfiguration().get(Property.INSTANCE_SECRET);
 +    IZooReaderWriter zk = new 
ZooReaderWriterFactory().getZooReaderWriter(instance.getZooKeepers(), 
instance.getZooKeepersSessionTimeOut(), secret);
 +    ZooStore<String> zs = new ZooStore<String>(ZooUtil.getRoot(instance) + 
Constants.ZFATE, zk);
 +    FateStatus fateStatus = admin.getStatus(zs, zk, ZooUtil.getRoot(instance) 
+ Constants.ZTABLE_LOCKS, null, null);
 +    return fateStatus;
 +  }
 +
 +  private void writeData(Connector c, String table) throws 
TableNotFoundException, MutationsRejectedException {
 +    BatchWriter bw = c.createBatchWriter(table, new BatchWriterConfig());
 +    try {
 +      Random rand = new Random();
 +      for (int i = 0; i < 1000; i++) {
 +        Mutation m = new Mutation(String.format("%09x", rand.nextInt(100000 * 
1000)));
 +        m.put("m", "order", "" + i);
 +        bw.addMutation(m);
 +      }
 +    } finally {
 +      bw.close();
 +    }
 +  }
 +}

Reply via email to