This is an automated email from the ASF dual-hosted git repository.

ctubbsii pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit 8f96071472f7d60a4f59d93b0fc03eaf5f4515b8
Merge: c7bc980 2b8243b
Author: Christopher Tubbs <ctubb...@apache.org>
AuthorDate: Thu Nov 7 18:15:50 2019 -0500

    Merge branch '1.9'

 .../clientImpl/AccumuloBulkMergeException.java     |   1 +
 .../test/functional/FateConcurrencyIT.java         | 354 ++++++---------------
 .../org/apache/accumulo/test/util/SlowOps.java     | 347 ++++++++++++++++++++
 3 files changed, 438 insertions(+), 264 deletions(-)

diff --cc 
core/src/main/java/org/apache/accumulo/core/clientImpl/AccumuloBulkMergeException.java
index 2a7527e,0000000..98d7015
mode 100644,000000..100644
--- 
a/core/src/main/java/org/apache/accumulo/core/clientImpl/AccumuloBulkMergeException.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/clientImpl/AccumuloBulkMergeException.java
@@@ -1,32 -1,0 +1,33 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one or more
 + * contributor license agreements.  See the NOTICE file distributed with
 + * this work for additional information regarding copyright ownership.
 + * The ASF licenses this file to You under the Apache License, Version 2.0
 + * (the "License"); you may not use this file except in compliance with
 + * the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.accumulo.core.clientImpl;
 +
 +import org.apache.accumulo.core.client.AccumuloException;
 +
 +/**
 + * Internal class indicating a concurrent merge occurred during the new bulk 
import.
 + */
 +public class AccumuloBulkMergeException extends AccumuloException {
 +
++  private static final long serialVersionUID = 1L;
 +  private static final String MSG = "Concurrent merge happened";
 +
 +  public AccumuloBulkMergeException(final Throwable cause) {
 +    super(MSG, cause);
 +  }
 +
 +}
diff --cc 
test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java
index 8367891,c3a4d79..47a315a
--- 
a/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java
+++ 
b/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java
@@@ -33,34 -30,22 +30,25 @@@ import java.util.concurrent.Future
  import java.util.concurrent.TimeUnit;
  
  import org.apache.accumulo.core.Constants;
 +import org.apache.accumulo.core.client.Accumulo;
 +import org.apache.accumulo.core.client.AccumuloClient;
  import org.apache.accumulo.core.client.AccumuloException;
  import org.apache.accumulo.core.client.AccumuloSecurityException;
- import org.apache.accumulo.core.client.BatchWriter;
- import org.apache.accumulo.core.client.IteratorSetting;
- import org.apache.accumulo.core.client.Scanner;
- import org.apache.accumulo.core.client.TableExistsException;
 -import org.apache.accumulo.core.client.Connector;
 -import org.apache.accumulo.core.client.Instance;
  import org.apache.accumulo.core.client.TableNotFoundException;
 -import org.apache.accumulo.core.client.impl.Tables;
 +import org.apache.accumulo.core.clientImpl.ClientContext;
- import org.apache.accumulo.core.clientImpl.ClientInfo;
 +import org.apache.accumulo.core.clientImpl.Tables;
  import org.apache.accumulo.core.conf.Property;
- import org.apache.accumulo.core.data.Key;
- import org.apache.accumulo.core.data.Mutation;
 +import org.apache.accumulo.core.data.TableId;
- import org.apache.accumulo.core.data.Value;
  import org.apache.accumulo.core.master.state.tables.TableState;
- import org.apache.accumulo.core.security.Authorizations;
 -import org.apache.accumulo.core.zookeeper.ZooUtil;
  import org.apache.accumulo.fate.AdminUtil;
  import org.apache.accumulo.fate.ZooStore;
  import org.apache.accumulo.fate.zookeeper.IZooReaderWriter;
 +import org.apache.accumulo.fate.zookeeper.ZooUtil;
  import org.apache.accumulo.harness.AccumuloClusterHarness;
  import org.apache.accumulo.server.zookeeper.ZooReaderWriterFactory;
- import org.apache.hadoop.io.Text;
+ import org.apache.accumulo.test.util.SlowOps;
  import org.apache.zookeeper.KeeperException;
 +import org.junit.After;
  import org.junit.AfterClass;
  import org.junit.Before;
  import org.junit.Test;
@@@ -85,8 -70,7 +73,8 @@@ public class FateConcurrencyIT extends 
    private static final int NUM_ROWS = 1000;
    private static final long SLOW_SCAN_SLEEP_MS = 250L;
  
-   private AccumuloClient accumuloClient;
 -  private Connector connector;
++  private AccumuloClient client;
 +  private ClientContext context;
  
    private static final ExecutorService pool = Executors.newCachedThreadPool();
  
@@@ -101,19 -84,16 +88,22 @@@
  
    @Before
    public void setup() {
-     accumuloClient = Accumulo.newClient().from(getClientProps()).build();
-     context = (ClientContext) accumuloClient;
+ 
 -    connector = getConnector();
++    client = Accumulo.newClient().from(getClientProps()).build();
++    context = (ClientContext) client;
  
      tableName = getUniqueNames(1)[0];
  
      secret = cluster.getSiteConfiguration().get(Property.INSTANCE_SECRET);
  
-     createData(tableName);
+     maxWait = defaultTimeoutSeconds() <= 0 ? 60_000 : 
((defaultTimeoutSeconds() * 1000) / 2);
+ 
 -    slowOps = new SlowOps(connector, tableName, maxWait, 1);
++    slowOps = new SlowOps(client, tableName, maxWait, 1);
 +  }
 +
 +  @After
 +  public void closeClient() {
-     accumuloClient.close();
++    client.close();
    }
  
    @AfterClass
@@@ -154,7 -134,7 +144,7 @@@
  
      // verify that offline then online functions as expected.
  
-     accumuloClient.tableOperations().offline(tableName, true);
 -    connector.tableOperations().offline(tableName, true);
++    client.tableOperations().offline(tableName, true);
      assertEquals("verify table is offline", TableState.OFFLINE, 
getTableState(tableName));
  
      onlineOp = new OnLineCallable(tableName);
@@@ -187,11 -166,10 +176,10 @@@
  
      assertEquals("verify table is still online", TableState.ONLINE, 
getTableState(tableName));
  
-     assertTrue("verify compaction still running and fate transaction still 
exists",
-         blockUntilCompactionRunning(tableName));
+     assertTrue("Find FATE operation for table", findFate(tableName));
  
      // test complete, cancel compaction and move on.
-     accumuloClient.tableOperations().cancelCompaction(tableName);
 -    connector.tableOperations().cancelCompaction(tableName);
++    client.tableOperations().cancelCompaction(tableName);
  
      log.debug("Success: Timing results for online commands.");
      log.debug("Time for unblocked online {} ms",
@@@ -214,14 -215,9 +225,8 @@@
    @Test
    public void getFateStatus() {
  
 -    Instance instance = connector.getInstance();
 -    String tableId;
 +    TableId tableId;
  
-     // for development testing - force transient condition that was failing 
this test so that
-     // we know if multiple compactions are running, they are properly handled 
by the test code.
-     if (runMultipleCompactions) {
-       runMultipleCompactions();
-     }
- 
      try {
  
        assertEquals("verify table online after created", TableState.ONLINE,
@@@ -249,14 -245,13 +254,13 @@@
  
        try {
  
-         String instanceId = 
accumuloClient.instanceOperations().getInstanceID();
-         ClientInfo info = ClientInfo.from(accumuloClient.properties());
-         IZooReaderWriter zk = new 
ZooReaderWriterFactory().getZooReaderWriter(info.getZooKeepers(),
-             info.getZooKeepersSessionTimeOut(), secret);
++        String instanceId = context.getInstanceID();
+         IZooReaderWriter zk = new ZooReaderWriterFactory().getZooReaderWriter(
 -            instance.getZooKeepers(), instance.getZooKeepersSessionTimeOut(), 
secret);
 -
 -        ZooStore<String> zs = new ZooStore<>(ZooUtil.getRoot(instance) + 
Constants.ZFATE, zk);
++            context.getZooKeepers(), context.getZooKeepersSessionTimeOut(), 
secret);
 +        ZooStore<String> zs = new ZooStore<>(ZooUtil.getRoot(instanceId) + 
Constants.ZFATE, zk);
  
          withLocks = admin.getStatus(zs, zk,
 -            ZooUtil.getRoot(instance) + Constants.ZTABLE_LOCKS + "/" + 
tableId, null, null);
 +            ZooUtil.getRoot(instanceId) + Constants.ZTABLE_LOCKS + "/" + 
tableId, null, null);
  
          // call method that does not use locks.
          noLocks = admin.getTransactionStatus(zs, null, null);
@@@ -306,111 -301,15 +310,15 @@@
      try {
  
        // test complete, cancel compaction and move on.
-       accumuloClient.tableOperations().cancelCompaction(tableName);
 -      connector.tableOperations().cancelCompaction(tableName);
++      client.tableOperations().cancelCompaction(tableName);
  
        // block if compaction still running
-       compactTask.get();
- 
-     } catch (InterruptedException ex) {
-       Thread.currentThread().interrupt();
-     } catch (TableNotFoundException | AccumuloSecurityException | 
AccumuloException
-         | ExecutionException ex) {
-       log.debug("Could not cancel compaction", ex);
-     }
-   }
- 
-   /**
-    * This method was helpful for debugging a condition that was causing 
transient test failures.
-    * This forces a condition that the test should be able to handle. This 
method is not needed
-    * during normal testing, it was kept to aid future test development / 
troubleshooting if other
-    * transient failures occur.
-    */
-   private void runMultipleCompactions() {
- 
-     for (int i = 0; i < 4; i++) {
- 
-       String aTableName = getUniqueNames(1)[0] + "_" + i;
- 
-       createData(aTableName);
- 
-       log.debug("Table: {}", aTableName);
- 
-       pool.submit(new SlowCompactionRunner(aTableName));
- 
-       assertTrue("verify that compaction running and fate transaction exists",
-           blockUntilCompactionRunning(aTableName));
- 
-     }
-   }
- 
-   /**
-    * Create and run a slow running compaction task. The method will block 
until the compaction has
-    * been started.
-    *
-    * @return a reference to the running compaction task.
-    */
-   private Future<?> startCompactTask() {
-     Future<?> compactTask = pool.submit(new SlowCompactionRunner(tableName));
-     assertTrue("verify that compaction running and fate transaction exists",
-         blockUntilCompactionRunning(tableName));
-     return compactTask;
-   }
- 
-   /**
-    * Blocks current thread until compaction is running.
-    *
-    * @return true if compaction and associate fate found.
-    */
-   private boolean blockUntilCompactionRunning(final String tableName) {
- 
-     long maxWait = defaultTimeoutSeconds() <= 0 ? 60_000 : 
((defaultTimeoutSeconds() * 1000) / 2);
- 
-     long startWait = System.currentTimeMillis();
- 
-     List<String> tservers = 
accumuloClient.instanceOperations().getTabletServers();
- 
-     /*
-      * wait for compaction to start on table - The compaction will acquire a 
fate transaction lock
-      * that used to block a subsequent online command while the fate 
transaction lock was held.
-      */
-     while (System.currentTimeMillis() < (startWait + maxWait)) {
- 
-       try {
- 
-         int runningCompactions = 0;
- 
-         for (String tserver : tservers) {
-           runningCompactions +=
-               
accumuloClient.instanceOperations().getActiveCompactions(tserver).size();
-           log.trace("tserver {}, running compactions {}", tservers, 
runningCompactions);
-         }
- 
-         if (runningCompactions > 0) {
-           // Validate that there is a compaction fate transaction - otherwise 
test is invalid.
-           if (findFate(tableName)) {
-             return true;
-           }
-         }
- 
-       } catch (AccumuloSecurityException | AccumuloException ex) {
-         throw new IllegalStateException("failed to get active compactions, 
test fails.", ex);
-       } catch (KeeperException ex) {
-         log.trace("Saw possible transient zookeeper error");
-       }
+       boolean cancelled = slowOps.blockWhileCompactionRunning();
+       log.debug("Cancel completed successfully: {}", cancelled);
  
-       try {
-         Thread.sleep(250);
-       } catch (InterruptedException ex) {
-         // reassert interrupt
-         Thread.currentThread().interrupt();
-       }
+     } catch (TableNotFoundException | AccumuloSecurityException | 
AccumuloException ex) {
+       log.debug("Could not cancel compaction due to exception", ex);
      }
- 
-     log.debug("Could not find compaction for {} after {} seconds", tableName,
-         TimeUnit.MILLISECONDS.toSeconds(maxWait));
- 
-     return false;
- 
    }
  
    /**
@@@ -428,8 -327,9 +336,8 @@@
     * @throws KeeperException
     *           if a zookeeper error occurred - allows for retries.
     */
-   private boolean findFate(final String tableName) throws KeeperException {
+   private boolean lookupFateInZookeeper(final String tableName) throws 
KeeperException {
  
 -    Instance instance = connector.getInstance();
      AdminUtil<String> admin = new AdminUtil<>(false);
  
      try {
@@@ -438,16 -338,11 +346,12 @@@
  
        log.trace("tid: {}", tableId);
  
-       ClientInfo info = ClientInfo.from(accumuloClient.properties());
-       IZooReaderWriter zk = new 
ZooReaderWriterFactory().getZooReaderWriter(info.getZooKeepers(),
-           info.getZooKeepersSessionTimeOut(), secret);
-       ZooStore<String> zs = new ZooStore<>(
-           
ZooUtil.getRoot(accumuloClient.instanceOperations().getInstanceID()) + 
Constants.ZFATE,
-           zk);
 -      IZooReaderWriter zk = new ZooReaderWriterFactory().getZooReaderWriter(
 -          instance.getZooKeepers(), instance.getZooKeepersSessionTimeOut(), 
secret);
 -      ZooStore<String> zs = new ZooStore<>(ZooUtil.getRoot(instance) + 
Constants.ZFATE, zk);
++      String instanceId = context.getInstanceID();
++      IZooReaderWriter zk = new 
ZooReaderWriterFactory().getZooReaderWriter(context.getZooKeepers(),
++          context.getZooKeepersSessionTimeOut(), secret);
++      ZooStore<String> zs = new ZooStore<>(ZooUtil.getRoot(instanceId) + 
Constants.ZFATE, zk);
        AdminUtil.FateStatus fateStatus = admin.getStatus(zs, zk,
-           ZooUtil.getRoot(accumuloClient.instanceOperations().getInstanceID())
-               + Constants.ZTABLE_LOCKS + "/" + tableId,
-           null, null);
 -          ZooUtil.getRoot(instance) + Constants.ZTABLE_LOCKS + "/" + tableId, 
null, null);
++          ZooUtil.getRoot(instanceId) + Constants.ZTABLE_LOCKS + "/" + 
tableId, null, null);
  
        log.trace("current fates: {}", fateStatus.getTransactions().size());
  
@@@ -614,7 -456,7 +465,7 @@@
  
        log.trace("Setting {} online", tableName);
  
-       accumuloClient.tableOperations().online(tableName, true);
 -      connector.tableOperations().online(tableName, true);
++      client.tableOperations().online(tableName, true);
        // stop timing
        status.setComplete();
  
@@@ -626,73 -468,48 +477,48 @@@
    }
  
    /**
-    * Instance to create / run a compaction using a slow iterator.
+    * Concurrency testing - ensure that tests are valid id multiple 
compactions are running. for
+    * development testing - force transient condition that was failing this 
test so that we know if
+    * multiple compactions are running, they are properly handled by the test 
code and the tests are
+    * valid.
     */
-   private class SlowCompactionRunner implements Runnable {
- 
-     private final String tableName;
+   @Test
+   public void multipleCompactions() {
  
-     /**
-      * Create an instance of this class.
-      *
-      * @param tableName
-      *          the name of the table that will be compacted with the slow 
iterator.
-      */
-     SlowCompactionRunner(final String tableName) {
-       this.tableName = tableName;
-     }
+     int tableCount = 4;
  
-     @Override
-     public void run() {
+     List<SlowOps> tables = new ArrayList<>();
  
-       long startTimestamp = System.nanoTime();
+     for (int i = 0; i < tableCount; i++) {
+       String uniqueName = getUniqueNames(1)[0] + "_" + i;
 -      SlowOps gen = new SlowOps(connector, uniqueName, maxWait, tableCount);
++      SlowOps gen = new SlowOps(client, uniqueName, maxWait, tableCount);
+       tables.add(gen);
+       gen.startCompactTask();
+     }
  
-       IteratorSetting slow = new IteratorSetting(30, "slow", 
SlowIterator.class);
-       SlowIterator.setSleepTime(slow, SLOW_SCAN_SLEEP_MS);
+     int foundCount = 0;
  
-       List<IteratorSetting> compactIterators = new ArrayList<>();
-       compactIterators.add(slow);
+     for (SlowOps t : tables) {
+       log.debug("Look for fate {}", t.getTableName());
+       if (findFate(t.getTableName())) {
+         log.debug("Found fate {}", t.getTableName());
+         foundCount++;
+       }
+     }
  
-       log.trace("Slow iterator {}", slow);
+     assertEquals(tableCount, foundCount);
  
+     for (SlowOps t : tables) {
        try {
- 
-         log.trace("Start compaction");
- 
-         accumuloClient.tableOperations().compact(tableName, new Text("0"), 
new Text("z"),
-             compactIterators, true, true);
- 
-         log.trace("Compaction wait is complete");
- 
-         log.trace("Slow compaction of {} rows took {} ms", NUM_ROWS, 
TimeUnit.MILLISECONDS
-             .convert((System.nanoTime() - startTimestamp), 
TimeUnit.NANOSECONDS));
- 
-         // validate that number of rows matches expected.
- 
-         startTimestamp = System.nanoTime();
- 
-         // validate expected data created and exists in table.
- 
-         int count = scanCount(tableName);
- 
-         log.trace("After compaction, scan time for {} rows {} ms", NUM_ROWS, 
TimeUnit.MILLISECONDS
-             .convert((System.nanoTime() - startTimestamp), 
TimeUnit.NANOSECONDS));
- 
-         if (count != NUM_ROWS) {
-           throw new IllegalStateException(
-               String.format("After compaction, number of rows %1$d does not 
match expected %2$d",
-                   count, NUM_ROWS));
-         }
-       } catch (TableNotFoundException ex) {
-         throw new IllegalStateException("test failed, table " + tableName + " 
does not exist", ex);
-       } catch (AccumuloSecurityException ex) {
-         throw new IllegalStateException(
-             "test failed, could not add iterator due to security exception", 
ex);
-       } catch (AccumuloException ex) {
-         // test cancels compaction on complete, so ignore it as an exception.
-         if (!ex.getMessage().contains("Compaction canceled")) {
-           throw new IllegalStateException("test failed with an Accumulo 
exception", ex);
 -        connector.tableOperations().cancelCompaction(t.getTableName());
++        client.tableOperations().cancelCompaction(t.getTableName());
+         // block if compaction still running
+         boolean cancelled = t.blockWhileCompactionRunning();
+         if (!cancelled) {
+           log.info("Failed to cancel compaction during multiple compaction 
test clean-up for {}",
+               t.getTableName());
          }
+       } catch (AccumuloSecurityException | TableNotFoundException | 
AccumuloException ex) {
+         log.debug("Exception throw during multiple table test clean-up", ex);
        }
      }
    }
diff --cc test/src/main/java/org/apache/accumulo/test/util/SlowOps.java
index 0000000,bd51990..5e28a06
mode 000000,100644..100644
--- a/test/src/main/java/org/apache/accumulo/test/util/SlowOps.java
+++ b/test/src/main/java/org/apache/accumulo/test/util/SlowOps.java
@@@ -1,0 -1,347 +1,347 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ package org.apache.accumulo.test.util;
+ 
+ import static java.nio.charset.StandardCharsets.UTF_8;
+ 
+ import java.util.ArrayList;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.concurrent.ExecutionException;
+ import java.util.concurrent.ExecutorService;
+ import java.util.concurrent.Executors;
+ import java.util.concurrent.Future;
+ import java.util.concurrent.TimeUnit;
+ 
++import org.apache.accumulo.core.client.AccumuloClient;
+ import org.apache.accumulo.core.client.AccumuloException;
+ import org.apache.accumulo.core.client.AccumuloSecurityException;
+ import org.apache.accumulo.core.client.BatchWriter;
+ import org.apache.accumulo.core.client.BatchWriterConfig;
 -import org.apache.accumulo.core.client.Connector;
+ import org.apache.accumulo.core.client.IteratorSetting;
+ import org.apache.accumulo.core.client.Scanner;
+ import org.apache.accumulo.core.client.TableExistsException;
+ import org.apache.accumulo.core.client.TableNotFoundException;
+ import org.apache.accumulo.core.client.admin.ActiveCompaction;
+ import org.apache.accumulo.core.data.Key;
+ import org.apache.accumulo.core.data.Mutation;
+ import org.apache.accumulo.core.data.Value;
+ import org.apache.accumulo.core.security.Authorizations;
+ import org.apache.accumulo.test.functional.SlowIterator;
+ import org.apache.hadoop.io.Text;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
+ /**
+  * Common methods for performing operations that are deliberately take some 
period of time so that
+  * tests can interact while the operations are in progress.
+  */
+ public class SlowOps {
+ 
+   private static final Logger log = LoggerFactory.getLogger(SlowOps.class);
+ 
+   private static final String TSERVER_COMPACTION_MAJOR_CONCURRENT_MAX =
+       "tserver.compaction.major.concurrent.max";
+ 
+   private static final long SLOW_SCAN_SLEEP_MS = 250L;
+   private static final int NUM_DATA_ROWS = 1000;
+ 
 -  private final Connector connector;
++  private final AccumuloClient client;
+   private final String tableName;
+   private final long maxWait;
+ 
+   // private final int numRows = DEFAULT_NUM_DATA_ROWS;
+ 
+   private static final ExecutorService pool = Executors.newCachedThreadPool();
+ 
+   private Future<?> compactTask = null;
+ 
 -  private SlowOps(final Connector connector, final String tableName, final 
long maxWait) {
++  private SlowOps(final AccumuloClient client, final String tableName, final 
long maxWait) {
+ 
 -    this.connector = connector;
++    this.client = client;
+     this.tableName = tableName;
+     this.maxWait = maxWait;
+ 
+     createData();
+   }
+ 
 -  public SlowOps(final Connector connector, final String tableName, final 
long maxWait,
++  public SlowOps(final AccumuloClient client, final String tableName, final 
long maxWait,
+       final int numParallelExpected) {
+ 
 -    this(connector, tableName, maxWait);
++    this(client, tableName, maxWait);
+ 
+     setExpectedCompactions(numParallelExpected);
+ 
+   }
+ 
+   public void setExpectedCompactions(final int numParallelExpected) {
+ 
+     final int target = numParallelExpected + 1;
+ 
+     Map<String,String> sysConfig;
+ 
+     try {
+ 
 -      sysConfig = connector.instanceOperations().getSystemConfiguration();
++      sysConfig = client.instanceOperations().getSystemConfiguration();
+ 
+       int current = 
Integer.parseInt(sysConfig.get("tserver.compaction.major.concurrent.max"));
+ 
+       if (current < target) {
 -        
connector.instanceOperations().setProperty(TSERVER_COMPACTION_MAJOR_CONCURRENT_MAX,
++        
client.instanceOperations().setProperty(TSERVER_COMPACTION_MAJOR_CONCURRENT_MAX,
+             Integer.toString(target));
+ 
 -        sysConfig = connector.instanceOperations().getSystemConfiguration();
++        sysConfig = client.instanceOperations().getSystemConfiguration();
+ 
+       }
+ 
+       
Integer.parseInt(sysConfig.get(TSERVER_COMPACTION_MAJOR_CONCURRENT_MAX));
+ 
+     } catch (AccumuloException | AccumuloSecurityException | 
NumberFormatException ex) {
+       throw new IllegalStateException("Could not set parallel compaction 
limit to " + target, ex);
+     }
+   }
+ 
+   public String getTableName() {
+     return tableName;
+   }
+ 
+   private void createData() {
+ 
+     try {
+ 
+       // create table.
 -      connector.tableOperations().create(tableName);
++      client.tableOperations().create(tableName);
+ 
+       log.info("Created table id: {}, name \'{}\'",
 -          connector.tableOperations().tableIdMap().get(tableName), tableName);
++          client.tableOperations().tableIdMap().get(tableName), tableName);
+ 
 -      try (BatchWriter bw = connector.createBatchWriter(tableName, new 
BatchWriterConfig())) {
++      try (BatchWriter bw = client.createBatchWriter(tableName, new 
BatchWriterConfig())) {
+         // populate
+         for (int i = 0; i < NUM_DATA_ROWS; i++) {
+           Mutation m = new Mutation(new Text(String.format("%05d", i)));
+           m.put(new Text("col" + ((i % 3) + 1)), new Text("qual"),
+               new Value("junk".getBytes(UTF_8)));
+           bw.addMutation(m);
+         }
+       }
+ 
+       verifyRows();
+ 
+     } catch (AccumuloException | AccumuloSecurityException | 
TableNotFoundException
+         | TableExistsException ex) {
+       throw new IllegalStateException("Create data failed with exception", 
ex);
+     }
+   }
+ 
+   private void verifyRows() {
+ 
+     long startTimestamp = System.nanoTime();
+ 
+     int count = scanCount();
+ 
+     log.trace("Scan time for {} rows {} ms", NUM_DATA_ROWS,
+         TimeUnit.MILLISECONDS.convert((System.nanoTime() - startTimestamp), 
TimeUnit.NANOSECONDS));
+ 
+     if (count != NUM_DATA_ROWS) {
+       throw new IllegalStateException(
+           String.format("Number of rows %1$d does not match expected %2$d", 
count, NUM_DATA_ROWS));
+     }
+   }
+ 
+   private int scanCount() {
 -    try (Scanner scanner = connector.createScanner(tableName, 
Authorizations.EMPTY)) {
++    try (Scanner scanner = client.createScanner(tableName, 
Authorizations.EMPTY)) {
+ 
+       int count = 0;
+ 
+       for (Map.Entry<Key,Value> elt : scanner) {
+         String expected = String.format("%05d", count);
+         assert (elt.getKey().getRow().toString().equals(expected));
+         count++;
+       }
+       return count;
+     } catch (TableNotFoundException ex) {
+       log.debug("cannot verify row count, table \'{}\' does not exist", 
tableName);
+       throw new IllegalStateException(ex);
+     }
+   }
+ 
+   /**
+    * Create and run a slow running compaction task. The method will block 
until the compaction has
+    * been started. The compaction should be cancelled using Accumulo 
tableOps, and then the caller
+    * can use blockWhileCompactionRunning() on the instance of this class.
+    */
+   public void startCompactTask() {
+ 
+     compactTask = pool.submit(new SlowCompactionRunner());
+ 
+     if (!blockUntilCompactionRunning()) {
+       throw new IllegalStateException("Compaction could not be started for " 
+ tableName);
+     }
+   }
+ 
+   /**
+    * Instance to create / run a compaction using a slow iterator.
+    */
+   private class SlowCompactionRunner implements Runnable {
+ 
+     SlowCompactionRunner() {}
+ 
+     @Override
+     public void run() {
+ 
+       long startTimestamp = System.nanoTime();
+ 
+       IteratorSetting slow = new IteratorSetting(30, "slow", 
SlowIterator.class);
+       SlowIterator.setSleepTime(slow, SLOW_SCAN_SLEEP_MS);
+ 
+       List<IteratorSetting> compactIterators = new ArrayList<>();
+       compactIterators.add(slow);
+ 
+       log.trace("Starting slow operation using iterator: {}", slow);
+ 
+       int retry = 0;
+       boolean completed = false;
+ 
+       while (!completed && retry++ < 5) {
+ 
+         try {
+           log.info("Starting compaction.  Attempt {}", retry);
 -          connector.tableOperations().compact(tableName, null, null, 
compactIterators, true, true);
++          client.tableOperations().compact(tableName, null, null, 
compactIterators, true, true);
+           completed = true;
+         } catch (Throwable ex) {
+           // test cancels compaction on complete, so ignore it as an 
exception.
+           if (ex.getMessage().contains("Compaction canceled")) {
+             return;
+           }
+           log.info("Exception thrown while waiting for compaction - will 
retry", ex);
+           try {
+             Thread.sleep(10_000 * retry);
+           } catch (InterruptedException iex) {
+             Thread.currentThread().interrupt();
+             return;
+           }
+         }
+       }
+       log.debug("Compaction wait is complete");
+ 
+       log.trace("Slow compaction of {} rows took {} ms", NUM_DATA_ROWS, 
TimeUnit.MILLISECONDS
+           .convert((System.nanoTime() - startTimestamp), 
TimeUnit.NANOSECONDS));
+ 
+       // validate that number of rows matches expected.
+ 
+       startTimestamp = System.nanoTime();
+ 
+       // validate expected data created and exists in table.
+ 
+       int count = scanCount();
+ 
+       log.trace("After compaction, scan time for {} rows {} ms", 
NUM_DATA_ROWS,
+           TimeUnit.MILLISECONDS.convert((System.nanoTime() - startTimestamp),
+               TimeUnit.NANOSECONDS));
+ 
+       if (count != NUM_DATA_ROWS) {
+         throw new IllegalStateException(
+             String.format("After compaction, number of rows %1$d does not 
match expected %2$d",
+                 count, NUM_DATA_ROWS));
+       }
+     }
+   }
+ 
+   /**
+    * Blocks current thread until compaction is running.
+    *
+    * @return true if compaction and associate fate found.
+    */
+   private boolean blockUntilCompactionRunning() {
+ 
+     long startWait = System.currentTimeMillis();
+ 
 -    List<String> tservers = connector.instanceOperations().getTabletServers();
++    List<String> tservers = client.instanceOperations().getTabletServers();
+ 
+     /*
+      * wait for compaction to start on table - The compaction will acquire a 
fate transaction lock
+      * that used to block a subsequent online command while the fate 
transaction lock was held.
+      */
+     while (System.currentTimeMillis() < (startWait + maxWait)) {
+ 
+       try {
+ 
+         List<ActiveCompaction> activeCompactions = new ArrayList<>();
+ 
+         for (String tserver : tservers) {
 -          List<ActiveCompaction> ac = 
connector.instanceOperations().getActiveCompactions(tserver);
++          List<ActiveCompaction> ac = 
client.instanceOperations().getActiveCompactions(tserver);
+           activeCompactions.addAll(ac);
+           // runningCompactions += ac.size();
+           log.trace("tserver {}, running compactions {}", tservers, 
ac.size());
+         }
+ 
+         if (!activeCompactions.isEmpty()) {
+           try {
+             for (ActiveCompaction compaction : activeCompactions) {
+               log.debug("Compaction running for {}", compaction.getTable());
+               if (compaction.getTable().compareTo(tableName) == 0) {
+                 return true;
+               }
+             }
+           } catch (TableNotFoundException ex) {
+             log.trace("Compaction found for unknown table {}", 
activeCompactions);
+           }
+         }
+       } catch (AccumuloSecurityException | AccumuloException ex) {
+         throw new IllegalStateException("failed to get active compactions, 
test fails.", ex);
+       }
+ 
+       try {
+         Thread.sleep(3_000);
+       } catch (InterruptedException ex) {
+         // reassert interrupt
+         Thread.currentThread().interrupt();
+       }
+     }
+ 
+     log.debug("Could not find compaction for {} after {} seconds", tableName,
+         TimeUnit.MILLISECONDS.toSeconds(maxWait));
+ 
+     return false;
+ 
+   }
+ 
+   /**
+    * Will block as long as the underlying compaction task is running. This 
method is intended to be
+    * used when the the compaction is cancelled via table operation cancel 
method - when the cancel
+    * command completed, the running task will terminate and then this method 
will return.
+    *
+    * @return true if the task returned.
+    */
+   public boolean blockWhileCompactionRunning() {
+ 
+     try {
+       if (compactTask == null) {
+         throw new IllegalStateException(
+             "Compaction task has not been started - call 
startCompactionTask() before blocking");
+       }
+       compactTask.get();
+       return true;
+     } catch (InterruptedException ex) {
+       Thread.currentThread().interrupt();
+       return false;
+     } catch (ExecutionException ex) {
+       return false;
+     }
+   }
+ 
+ }

Reply via email to