This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch 1.7
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit 505d999b6944d8a91201257a4891a1ced6f64e25
Author: Keith Turner <ktur...@apache.org>
AuthorDate: Fri Feb 2 12:12:25 2018 -0500

    Revert "ACCUMULO-4778 Cache table name to id map (#364)"
    
    This reverts commit 5adeb4b7ed561a0bcea1a1def17835310831662f.
---
 .../client/impl/MultiTableBatchWriterImpl.java     |  79 ++++++++++++-
 .../apache/accumulo/core/client/impl/TableMap.java | 100 -----------------
 .../apache/accumulo/core/client/impl/Tables.java   | 123 +++++++++++----------
 .../accumulo/test/MultiTableBatchWriterIT.java     | 119 +++++++++++++++++++-
 4 files changed, 259 insertions(+), 162 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/client/impl/MultiTableBatchWriterImpl.java
 
b/core/src/main/java/org/apache/accumulo/core/client/impl/MultiTableBatchWriterImpl.java
index e7a6d73..f5e1fa0 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/client/impl/MultiTableBatchWriterImpl.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/client/impl/MultiTableBatchWriterImpl.java
@@ -19,26 +19,37 @@ package org.apache.accumulo.core.client.impl;
 import static com.google.common.base.Preconditions.checkArgument;
 
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.Instance;
 import org.apache.accumulo.core.client.MultiTableBatchWriter;
 import org.apache.accumulo.core.client.MutationsRejectedException;
 import org.apache.accumulo.core.client.TableNotFoundException;
 import org.apache.accumulo.core.client.TableOfflineException;
 import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.master.state.tables.TableState;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
 import com.google.common.util.concurrent.UncheckedExecutionException;
 
 public class MultiTableBatchWriterImpl implements MultiTableBatchWriter {
+  public static final long DEFAULT_CACHE_TIME = 200;
+  public static final TimeUnit DEFAULT_CACHE_TIME_UNIT = TimeUnit.MILLISECONDS;
 
   private static final Logger log = 
LoggerFactory.getLogger(MultiTableBatchWriterImpl.class);
   private AtomicBoolean closed;
+  private AtomicLong cacheLastState;
 
   private class TableBatchWriter implements BatchWriter {
 
@@ -71,17 +82,49 @@ public class MultiTableBatchWriterImpl implements 
MultiTableBatchWriter {
 
   }
 
+  /**
+   * CacheLoader which will look up the internal table ID for a given table 
name.
+   */
+  private class TableNameToIdLoader extends CacheLoader<String,String> {
+
+    @Override
+    public String load(String tableName) throws Exception {
+      Instance instance = context.getInstance();
+      String tableId = Tables.getNameToIdMap(instance).get(tableName);
+
+      if (tableId == null)
+        throw new TableNotFoundException(null, tableName, null);
+
+      if (Tables.getTableState(instance, tableId) == TableState.OFFLINE)
+        throw new TableOfflineException(instance, tableId);
+
+      return tableId;
+    }
+
+  }
+
   private TabletServerBatchWriter bw;
   private ConcurrentHashMap<String,BatchWriter> tableWriters;
   private final ClientContext context;
+  private final LoadingCache<String,String> nameToIdCache;
 
   public MultiTableBatchWriterImpl(ClientContext context, BatchWriterConfig 
config) {
+    this(context, config, DEFAULT_CACHE_TIME, DEFAULT_CACHE_TIME_UNIT);
+  }
+
+  public MultiTableBatchWriterImpl(ClientContext context, BatchWriterConfig 
config, long cacheTime, TimeUnit cacheTimeUnit) {
     checkArgument(context != null, "context is null");
     checkArgument(config != null, "config is null");
+    checkArgument(cacheTimeUnit != null, "cacheTimeUnit is null");
     this.context = context;
     this.bw = new TabletServerBatchWriter(context, config);
     tableWriters = new ConcurrentHashMap<>();
     this.closed = new AtomicBoolean(false);
+    this.cacheLastState = new AtomicLong(0);
+
+    // Potentially up to ~500k used to cache names to IDs with "segments" of 
(maybe) ~1000 entries
+    nameToIdCache = CacheBuilder.newBuilder().expireAfterWrite(cacheTime, 
cacheTimeUnit).concurrencyLevel(10).maximumSize(10000).initialCapacity(20)
+        .build(new TableNameToIdLoader());
   }
 
   @Override
@@ -118,7 +161,7 @@ public class MultiTableBatchWriterImpl implements 
MultiTableBatchWriter {
    */
   private String getId(String tableName) throws TableNotFoundException {
     try {
-      return Tables.getTableId(context.inst, tableName);
+      return nameToIdCache.get(tableName);
     } catch (UncheckedExecutionException e) {
       Throwable cause = e.getCause();
 
@@ -133,6 +176,20 @@ public class MultiTableBatchWriterImpl implements 
MultiTableBatchWriter {
       }
 
       throw e;
+    } catch (ExecutionException e) {
+      Throwable cause = e.getCause();
+
+      log.error("Unexpected exception when fetching table id for " + 
tableName);
+
+      if (null == cause) {
+        throw new RuntimeException(e);
+      } else if (cause instanceof TableNotFoundException) {
+        throw (TableNotFoundException) cause;
+      } else if (cause instanceof TableOfflineException) {
+        throw (TableOfflineException) cause;
+      }
+
+      throw new RuntimeException(e);
     }
   }
 
@@ -140,6 +197,26 @@ public class MultiTableBatchWriterImpl implements 
MultiTableBatchWriter {
   public BatchWriter getBatchWriter(String tableName) throws 
AccumuloException, AccumuloSecurityException, TableNotFoundException {
     checkArgument(tableName != null, "tableName is null");
 
+    while (true) {
+      long cacheResetCount = Tables.getCacheResetCount();
+
+      // cacheResetCount could change after this point in time, but I think 
thats ok because just want to ensure this methods sees changes
+      // made before it was called.
+
+      long internalResetCount = cacheLastState.get();
+
+      if (cacheResetCount > internalResetCount) {
+        if (!cacheLastState.compareAndSet(internalResetCount, 
cacheResetCount)) {
+          continue; // concurrent operation, lets not possibly move 
cacheLastState backwards in the case where a thread pauses for along time
+        }
+
+        nameToIdCache.invalidateAll();
+        break;
+      }
+
+      break;
+    }
+
     String tableId = getId(tableName);
 
     BatchWriter tbw = tableWriters.get(tableId);
diff --git 
a/core/src/main/java/org/apache/accumulo/core/client/impl/TableMap.java 
b/core/src/main/java/org/apache/accumulo/core/client/impl/TableMap.java
deleted file mode 100644
index 3f3d90c..0000000
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/TableMap.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.accumulo.core.client.impl;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.apache.accumulo.core.client.impl.Tables.qualified;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.client.Instance;
-import org.apache.accumulo.core.client.NamespaceNotFoundException;
-import org.apache.accumulo.core.zookeeper.ZooUtil;
-import org.apache.accumulo.fate.zookeeper.ZooCache;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.ImmutableMap;
-
-/**
- * Used for thread safe caching of immutable table ID maps. See ACCUMULO-4778.
- */
-public class TableMap {
-  private static final Logger log = LoggerFactory.getLogger(TableMap.class);
-
-  private final Map<String,String> tableNameToIdMap;
-  private final Map<String,String> tableIdToNameMap;
-
-  public TableMap(Instance instance, ZooCache zooCache) {
-    List<String> tableIds = zooCache.getChildren(ZooUtil.getRoot(instance) + 
Constants.ZTABLES);
-    Map<String,String> namespaceIdToNameMap = new HashMap<>();
-    ImmutableMap.Builder<String,String> tableNameToIdBuilder = new 
ImmutableMap.Builder<>();
-    ImmutableMap.Builder<String,String> tableIdToNameBuilder = new 
ImmutableMap.Builder<>();
-    // use StringBuilder to construct zPath string efficiently across many 
tables
-    StringBuilder zPathBuilder = new StringBuilder();
-    
zPathBuilder.append(ZooUtil.getRoot(instance)).append(Constants.ZTABLES).append("/");
-    int prefixLength = zPathBuilder.length();
-
-    for (String tableId : tableIds) {
-      // reset StringBuilder to prefix length before appending ID and suffix
-      zPathBuilder.setLength(prefixLength);
-      zPathBuilder.append(tableId).append(Constants.ZTABLE_NAME);
-      byte[] tableName = zooCache.get(zPathBuilder.toString());
-      zPathBuilder.setLength(prefixLength);
-      zPathBuilder.append(tableId).append(Constants.ZTABLE_NAMESPACE);
-      byte[] nId = zooCache.get(zPathBuilder.toString());
-
-      String namespaceName = Namespaces.DEFAULT_NAMESPACE;
-      // create fully qualified table name
-      if (nId == null) {
-        namespaceName = null;
-      } else {
-        String namespaceId = new String(nId, UTF_8);
-        if (!namespaceId.equals(Namespaces.DEFAULT_NAMESPACE_ID)) {
-          try {
-            namespaceName = namespaceIdToNameMap.get(namespaceId);
-            if (namespaceName == null) {
-              namespaceName = Namespaces.getNamespaceName(instance, 
namespaceId);
-              namespaceIdToNameMap.put(namespaceId, namespaceName);
-            }
-          } catch (NamespaceNotFoundException e) {
-            log.error("Table (" + tableId + ") contains reference to namespace 
(" + namespaceId + ") that doesn't exist", e);
-            continue;
-          }
-        }
-      }
-      if (tableName != null && namespaceName != null) {
-        String tableNameStr = qualified(new String(tableName, UTF_8), 
namespaceName);
-        tableNameToIdBuilder.put(tableNameStr, tableId);
-        tableIdToNameBuilder.put(tableId, tableNameStr);
-      }
-    }
-    tableNameToIdMap = tableNameToIdBuilder.build();
-    tableIdToNameMap = tableIdToNameBuilder.build();
-  }
-
-  public Map<String,String> getNameToIdMap() {
-    return tableNameToIdMap;
-  }
-
-  public Map<String,String> getIdtoNameMap() {
-    return tableIdToNameMap;
-  }
-}
diff --git 
a/core/src/main/java/org/apache/accumulo/core/client/impl/Tables.java 
b/core/src/main/java/org/apache/accumulo/core/client/impl/Tables.java
index a93347c..fcf838f 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/impl/Tables.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/impl/Tables.java
@@ -20,11 +20,12 @@ import static 
com.google.common.base.Preconditions.checkArgument;
 import static java.nio.charset.StandardCharsets.UTF_8;
 
 import java.security.SecurityPermission;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.accumulo.core.Constants;
 import org.apache.accumulo.core.client.Instance;
@@ -36,49 +37,64 @@ import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.core.zookeeper.ZooUtil;
 import org.apache.accumulo.fate.zookeeper.ZooCache;
 import org.apache.accumulo.fate.zookeeper.ZooCacheFactory;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class Tables {
+  private static final Logger log = LoggerFactory.getLogger(Tables.class);
 
   public static final String VALID_NAME_REGEX = "^(\\w+\\.)?(\\w+)$";
 
   private static final SecurityPermission TABLES_PERMISSION = new 
SecurityPermission("tablesPermission");
-  // Per instance cache will expire after 10 minutes in case we encounter an 
instance not used frequently
-  private static Cache<String,TableMap> instanceToMapCache = 
CacheBuilder.newBuilder().expireAfterAccess(10, TimeUnit.MINUTES).build();
-  private static Cache<String,ZooCache> instanceToZooCache = 
CacheBuilder.newBuilder().expireAfterAccess(10, TimeUnit.MINUTES).build();
+  private static final AtomicLong cacheResetCount = new AtomicLong(0);
 
-  /**
-   * Return the cached ZooCache for provided instance. ZooCache is initially 
created with a watcher that will clear the TableMap cache for that instance when
-   * WatchedEvent occurs.
-   */
-  private static ZooCache getZooCache(final Instance instance) {
+  private static ZooCache getZooCache(Instance instance) {
     SecurityManager sm = System.getSecurityManager();
     if (sm != null) {
       sm.checkPermission(TABLES_PERMISSION);
     }
-    final String zks = instance.getZooKeepers();
-    final int timeOut = instance.getZooKeepersSessionTimeOut();
-    final String uuid = instance.getInstanceID();
+    return new ZooCacheFactory().getZooCache(instance.getZooKeepers(), 
instance.getZooKeepersSessionTimeOut());
+  }
 
-    try {
-      return instanceToZooCache.get(uuid, new Callable<ZooCache>() {
-        @Override
-        public ZooCache call() {
-          return new ZooCacheFactory().getZooCache(zks, timeOut, new Watcher() 
{
-            @Override
-            public void process(WatchedEvent watchedEvent) {
-              instanceToMapCache.invalidate(uuid);
+  private static SortedMap<String,String> getMap(Instance instance, boolean 
nameAsKey) {
+    ZooCache zc = getZooCache(instance);
+
+    List<String> tableIds = zc.getChildren(ZooUtil.getRoot(instance) + 
Constants.ZTABLES);
+    TreeMap<String,String> tableMap = new TreeMap<>();
+    Map<String,String> namespaceIdToNameMap = new HashMap<>();
+
+    for (String tableId : tableIds) {
+      byte[] tableName = zc.get(ZooUtil.getRoot(instance) + Constants.ZTABLES 
+ "/" + tableId + Constants.ZTABLE_NAME);
+      byte[] nId = zc.get(ZooUtil.getRoot(instance) + Constants.ZTABLES + "/" 
+ tableId + Constants.ZTABLE_NAMESPACE);
+      String namespaceName = Namespaces.DEFAULT_NAMESPACE;
+      // create fully qualified table name
+      if (nId == null) {
+        namespaceName = null;
+      } else {
+        String namespaceId = new String(nId, UTF_8);
+        if (!namespaceId.equals(Namespaces.DEFAULT_NAMESPACE_ID)) {
+          try {
+            namespaceName = namespaceIdToNameMap.get(namespaceId);
+            if (namespaceName == null) {
+              namespaceName = Namespaces.getNamespaceName(instance, 
namespaceId);
+              namespaceIdToNameMap.put(namespaceId, namespaceName);
             }
-          });
+          } catch (NamespaceNotFoundException e) {
+            log.error("Table (" + tableId + ") contains reference to namespace 
(" + namespaceId + ") that doesn't exist", e);
+            continue;
+          }
         }
-      });
-    } catch (ExecutionException e) {
-      throw new RuntimeException(e);
+      }
+      if (tableName != null && namespaceName != null) {
+        String tableNameStr = qualified(new String(tableName, UTF_8), 
namespaceName);
+        if (nameAsKey)
+          tableMap.put(tableNameStr, tableId);
+        else
+          tableMap.put(tableId, tableNameStr);
+      }
     }
+
+    return tableMap;
   }
 
   public static String getTableId(Instance instance, String tableName) throws 
TableNotFoundException {
@@ -113,31 +129,12 @@ public class Tables {
     return tableName;
   }
 
-  public static Map<String,String> getNameToIdMap(Instance instance) {
-    return getTableMap(instance).getNameToIdMap();
-  }
-
-  public static Map<String,String> getIdToNameMap(Instance instance) {
-    return getTableMap(instance).getIdtoNameMap();
+  public static SortedMap<String,String> getNameToIdMap(Instance instance) {
+    return getMap(instance, true);
   }
 
-  /**
-   * Get the TableMap from the cache. A new one will be populated when needed. 
Cache is cleared manually by calling {@link #clearCache(Instance)} or
-   * automatically cleared by ZooCache watcher created in {@link 
#getZooCache(Instance)}. See ACCUMULO-4778.
-   */
-  private static TableMap getTableMap(final Instance instance) {
-    TableMap map;
-    try {
-      map = instanceToMapCache.get(instance.getInstanceID(), new 
Callable<TableMap>() {
-        @Override
-        public TableMap call() {
-          return new TableMap(instance, getZooCache(instance));
-        }
-      });
-    } catch (ExecutionException e) {
-      throw new RuntimeException(e);
-    }
-    return map;
+  public static SortedMap<String,String> getIdToNameMap(Instance instance) {
+    return getMap(instance, false);
   }
 
   public static boolean exists(Instance instance, String tableId) {
@@ -147,9 +144,9 @@ public class Tables {
   }
 
   public static void clearCache(Instance instance) {
+    cacheResetCount.incrementAndGet();
     getZooCache(instance).clear(ZooUtil.getRoot(instance) + Constants.ZTABLES);
     getZooCache(instance).clear(ZooUtil.getRoot(instance) + 
Constants.ZNAMESPACES);
-    instanceToMapCache.invalidate(instance.getInstanceID());
   }
 
   /**
@@ -161,9 +158,17 @@ public class Tables {
    *          A zookeeper path
    */
   public static void clearCacheByPath(Instance instance, final String zooPath) 
{
-    String thePath = zooPath.startsWith("/") ? zooPath : "/" + zooPath;
+
+    String thePath;
+
+    if (zooPath.startsWith("/")) {
+      thePath = zooPath;
+    } else {
+      thePath = "/" + zooPath;
+    }
+
     getZooCache(instance).clear(ZooUtil.getRoot(instance) + thePath);
-    instanceToMapCache.invalidate(instance.getInstanceID());
+
   }
 
   public static String getPrintableTableNameFromId(Map<String,String> 
tidToNameMap, String tableId) {
@@ -224,6 +229,10 @@ public class Tables {
 
   }
 
+  public static long getCacheResetCount() {
+    return cacheResetCount.get();
+  }
+
   public static String qualified(String tableName) {
     return qualified(tableName, Namespaces.DEFAULT_NAMESPACE);
   }
diff --git 
a/test/src/test/java/org/apache/accumulo/test/MultiTableBatchWriterIT.java 
b/test/src/test/java/org/apache/accumulo/test/MultiTableBatchWriterIT.java
index fa5d8bb..f9720f0 100644
--- a/test/src/test/java/org/apache/accumulo/test/MultiTableBatchWriterIT.java
+++ b/test/src/test/java/org/apache/accumulo/test/MultiTableBatchWriterIT.java
@@ -20,6 +20,7 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
@@ -30,6 +31,7 @@ import org.apache.accumulo.core.client.MultiTableBatchWriter;
 import org.apache.accumulo.core.client.MutationsRejectedException;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.client.TableOfflineException;
 import org.apache.accumulo.core.client.admin.TableOperations;
 import org.apache.accumulo.core.client.impl.ClientContext;
 import org.apache.accumulo.core.client.impl.Credentials;
@@ -59,12 +61,12 @@ public class MultiTableBatchWriterIT extends 
AccumuloClusterIT {
   @Before
   public void setUpArgs() throws AccumuloException, AccumuloSecurityException {
     connector = getConnector();
-    mtbw = getMultiTableBatchWriter();
+    mtbw = getMultiTableBatchWriter(60);
   }
 
-  public MultiTableBatchWriter getMultiTableBatchWriter() {
+  public MultiTableBatchWriter getMultiTableBatchWriter(long 
cacheTimeoutInSeconds) {
     ClientContext context = new ClientContext(connector.getInstance(), new 
Credentials(getAdminPrincipal(), getAdminToken()), 
getCluster().getClientConfig());
-    return new MultiTableBatchWriterImpl(context, new BatchWriterConfig());
+    return new MultiTableBatchWriterImpl(context, new BatchWriterConfig(), 
cacheTimeoutInSeconds, TimeUnit.SECONDS);
   }
 
   @Test
@@ -263,7 +265,7 @@ public class MultiTableBatchWriterIT extends 
AccumuloClusterIT {
 
   @Test
   public void testTableRenameNewWritersNoCaching() throws Exception {
-    mtbw = getMultiTableBatchWriter();
+    mtbw = getMultiTableBatchWriter(0);
 
     try {
       final String[] names = getUniqueNames(4);
@@ -404,4 +406,113 @@ public class MultiTableBatchWriterIT extends 
AccumuloClusterIT {
 
     Assert.assertTrue("Expected mutations to be rejected.", mutationsRejected);
   }
+
+  @Test
+  public void testOfflineTableWithCache() throws Exception {
+    boolean mutationsRejected = false;
+
+    try {
+      final String[] names = getUniqueNames(2);
+      final String table1 = names[0], table2 = names[1];
+
+      TableOperations tops = connector.tableOperations();
+      tops.create(table1);
+      tops.create(table2);
+
+      BatchWriter bw1 = mtbw.getBatchWriter(table1), bw2 = 
mtbw.getBatchWriter(table2);
+
+      Mutation m1 = new Mutation("foo");
+      m1.put("col1", "", "val1");
+      m1.put("col2", "", "val2");
+
+      bw1.addMutation(m1);
+      bw2.addMutation(m1);
+
+      tops.offline(table1);
+
+      try {
+        bw1 = mtbw.getBatchWriter(table1);
+      } catch (TableOfflineException e) {
+        // pass
+        mutationsRejected = true;
+      }
+
+      tops.offline(table2);
+
+      try {
+        bw2 = mtbw.getBatchWriter(table2);
+      } catch (TableOfflineException e) {
+        // pass
+        mutationsRejected = true;
+      }
+    } finally {
+      if (null != mtbw) {
+        try {
+          // Mutations might have flushed before the table offline occurred
+          mtbw.close();
+        } catch (MutationsRejectedException e) {
+          // Pass
+          mutationsRejected = true;
+        }
+      }
+    }
+
+    Assert.assertTrue("Expected mutations to be rejected.", mutationsRejected);
+  }
+
+  @Test
+  public void testOfflineTableWithoutCache() throws Exception {
+    mtbw = getMultiTableBatchWriter(0);
+    boolean mutationsRejected = false;
+
+    try {
+      final String[] names = getUniqueNames(2);
+      final String table1 = names[0], table2 = names[1];
+
+      TableOperations tops = connector.tableOperations();
+      tops.create(table1);
+      tops.create(table2);
+
+      BatchWriter bw1 = mtbw.getBatchWriter(table1), bw2 = 
mtbw.getBatchWriter(table2);
+
+      Mutation m1 = new Mutation("foo");
+      m1.put("col1", "", "val1");
+      m1.put("col2", "", "val2");
+
+      bw1.addMutation(m1);
+      bw2.addMutation(m1);
+
+      // Mutations might or might not flush before tables goes offline
+      tops.offline(table1);
+      tops.offline(table2);
+
+      try {
+        bw1 = mtbw.getBatchWriter(table1);
+        Assert.fail(table1 + " should be offline");
+      } catch (TableOfflineException e) {
+        // pass
+        mutationsRejected = true;
+      }
+
+      try {
+        bw2 = mtbw.getBatchWriter(table2);
+        Assert.fail(table1 + " should be offline");
+      } catch (TableOfflineException e) {
+        // pass
+        mutationsRejected = true;
+      }
+    } finally {
+      if (null != mtbw) {
+        try {
+          // Mutations might have flushed before the table offline occurred
+          mtbw.close();
+        } catch (MutationsRejectedException e) {
+          // Pass
+          mutationsRejected = true;
+        }
+      }
+    }
+
+    Assert.assertTrue("Expected mutations to be rejected.", mutationsRejected);
+  }
 }

-- 
To stop receiving notification emails like this one, please contact
ktur...@apache.org.

Reply via email to