This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch 1.7 in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 505d999b6944d8a91201257a4891a1ced6f64e25 Author: Keith Turner <ktur...@apache.org> AuthorDate: Fri Feb 2 12:12:25 2018 -0500 Revert "ACCUMULO-4778 Cache table name to id map (#364)" This reverts commit 5adeb4b7ed561a0bcea1a1def17835310831662f. --- .../client/impl/MultiTableBatchWriterImpl.java | 79 ++++++++++++- .../apache/accumulo/core/client/impl/TableMap.java | 100 ----------------- .../apache/accumulo/core/client/impl/Tables.java | 123 +++++++++++---------- .../accumulo/test/MultiTableBatchWriterIT.java | 119 +++++++++++++++++++- 4 files changed, 259 insertions(+), 162 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/MultiTableBatchWriterImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/MultiTableBatchWriterImpl.java index e7a6d73..f5e1fa0 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/MultiTableBatchWriterImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/MultiTableBatchWriterImpl.java @@ -19,26 +19,37 @@ package org.apache.accumulo.core.client.impl; import static com.google.common.base.Preconditions.checkArgument; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.BatchWriterConfig; +import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.client.MultiTableBatchWriter; import org.apache.accumulo.core.client.MutationsRejectedException; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.TableOfflineException; import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.master.state.tables.TableState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; import com.google.common.util.concurrent.UncheckedExecutionException; public class MultiTableBatchWriterImpl implements MultiTableBatchWriter { + public static final long DEFAULT_CACHE_TIME = 200; + public static final TimeUnit DEFAULT_CACHE_TIME_UNIT = TimeUnit.MILLISECONDS; private static final Logger log = LoggerFactory.getLogger(MultiTableBatchWriterImpl.class); private AtomicBoolean closed; + private AtomicLong cacheLastState; private class TableBatchWriter implements BatchWriter { @@ -71,17 +82,49 @@ public class MultiTableBatchWriterImpl implements MultiTableBatchWriter { } + /** + * CacheLoader which will look up the internal table ID for a given table name. + */ + private class TableNameToIdLoader extends CacheLoader<String,String> { + + @Override + public String load(String tableName) throws Exception { + Instance instance = context.getInstance(); + String tableId = Tables.getNameToIdMap(instance).get(tableName); + + if (tableId == null) + throw new TableNotFoundException(null, tableName, null); + + if (Tables.getTableState(instance, tableId) == TableState.OFFLINE) + throw new TableOfflineException(instance, tableId); + + return tableId; + } + + } + private TabletServerBatchWriter bw; private ConcurrentHashMap<String,BatchWriter> tableWriters; private final ClientContext context; + private final LoadingCache<String,String> nameToIdCache; public MultiTableBatchWriterImpl(ClientContext context, BatchWriterConfig config) { + this(context, config, DEFAULT_CACHE_TIME, DEFAULT_CACHE_TIME_UNIT); + } + + public MultiTableBatchWriterImpl(ClientContext context, BatchWriterConfig config, long cacheTime, TimeUnit cacheTimeUnit) { checkArgument(context != null, "context is null"); checkArgument(config != null, "config is null"); + checkArgument(cacheTimeUnit != null, "cacheTimeUnit is null"); this.context = context; this.bw = new TabletServerBatchWriter(context, config); tableWriters = new ConcurrentHashMap<>(); this.closed = new AtomicBoolean(false); + this.cacheLastState = new AtomicLong(0); + + // Potentially up to ~500k used to cache names to IDs with "segments" of (maybe) ~1000 entries + nameToIdCache = CacheBuilder.newBuilder().expireAfterWrite(cacheTime, cacheTimeUnit).concurrencyLevel(10).maximumSize(10000).initialCapacity(20) + .build(new TableNameToIdLoader()); } @Override @@ -118,7 +161,7 @@ public class MultiTableBatchWriterImpl implements MultiTableBatchWriter { */ private String getId(String tableName) throws TableNotFoundException { try { - return Tables.getTableId(context.inst, tableName); + return nameToIdCache.get(tableName); } catch (UncheckedExecutionException e) { Throwable cause = e.getCause(); @@ -133,6 +176,20 @@ public class MultiTableBatchWriterImpl implements MultiTableBatchWriter { } throw e; + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + + log.error("Unexpected exception when fetching table id for " + tableName); + + if (null == cause) { + throw new RuntimeException(e); + } else if (cause instanceof TableNotFoundException) { + throw (TableNotFoundException) cause; + } else if (cause instanceof TableOfflineException) { + throw (TableOfflineException) cause; + } + + throw new RuntimeException(e); } } @@ -140,6 +197,26 @@ public class MultiTableBatchWriterImpl implements MultiTableBatchWriter { public BatchWriter getBatchWriter(String tableName) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { checkArgument(tableName != null, "tableName is null"); + while (true) { + long cacheResetCount = Tables.getCacheResetCount(); + + // cacheResetCount could change after this point in time, but I think thats ok because just want to ensure this methods sees changes + // made before it was called. + + long internalResetCount = cacheLastState.get(); + + if (cacheResetCount > internalResetCount) { + if (!cacheLastState.compareAndSet(internalResetCount, cacheResetCount)) { + continue; // concurrent operation, lets not possibly move cacheLastState backwards in the case where a thread pauses for along time + } + + nameToIdCache.invalidateAll(); + break; + } + + break; + } + String tableId = getId(tableName); BatchWriter tbw = tableWriters.get(tableId); diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/TableMap.java b/core/src/main/java/org/apache/accumulo/core/client/impl/TableMap.java deleted file mode 100644 index 3f3d90c..0000000 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/TableMap.java +++ /dev/null @@ -1,100 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.accumulo.core.client.impl; - -import static java.nio.charset.StandardCharsets.UTF_8; -import static org.apache.accumulo.core.client.impl.Tables.qualified; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.accumulo.core.Constants; -import org.apache.accumulo.core.client.Instance; -import org.apache.accumulo.core.client.NamespaceNotFoundException; -import org.apache.accumulo.core.zookeeper.ZooUtil; -import org.apache.accumulo.fate.zookeeper.ZooCache; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.collect.ImmutableMap; - -/** - * Used for thread safe caching of immutable table ID maps. See ACCUMULO-4778. - */ -public class TableMap { - private static final Logger log = LoggerFactory.getLogger(TableMap.class); - - private final Map<String,String> tableNameToIdMap; - private final Map<String,String> tableIdToNameMap; - - public TableMap(Instance instance, ZooCache zooCache) { - List<String> tableIds = zooCache.getChildren(ZooUtil.getRoot(instance) + Constants.ZTABLES); - Map<String,String> namespaceIdToNameMap = new HashMap<>(); - ImmutableMap.Builder<String,String> tableNameToIdBuilder = new ImmutableMap.Builder<>(); - ImmutableMap.Builder<String,String> tableIdToNameBuilder = new ImmutableMap.Builder<>(); - // use StringBuilder to construct zPath string efficiently across many tables - StringBuilder zPathBuilder = new StringBuilder(); - zPathBuilder.append(ZooUtil.getRoot(instance)).append(Constants.ZTABLES).append("/"); - int prefixLength = zPathBuilder.length(); - - for (String tableId : tableIds) { - // reset StringBuilder to prefix length before appending ID and suffix - zPathBuilder.setLength(prefixLength); - zPathBuilder.append(tableId).append(Constants.ZTABLE_NAME); - byte[] tableName = zooCache.get(zPathBuilder.toString()); - zPathBuilder.setLength(prefixLength); - zPathBuilder.append(tableId).append(Constants.ZTABLE_NAMESPACE); - byte[] nId = zooCache.get(zPathBuilder.toString()); - - String namespaceName = Namespaces.DEFAULT_NAMESPACE; - // create fully qualified table name - if (nId == null) { - namespaceName = null; - } else { - String namespaceId = new String(nId, UTF_8); - if (!namespaceId.equals(Namespaces.DEFAULT_NAMESPACE_ID)) { - try { - namespaceName = namespaceIdToNameMap.get(namespaceId); - if (namespaceName == null) { - namespaceName = Namespaces.getNamespaceName(instance, namespaceId); - namespaceIdToNameMap.put(namespaceId, namespaceName); - } - } catch (NamespaceNotFoundException e) { - log.error("Table (" + tableId + ") contains reference to namespace (" + namespaceId + ") that doesn't exist", e); - continue; - } - } - } - if (tableName != null && namespaceName != null) { - String tableNameStr = qualified(new String(tableName, UTF_8), namespaceName); - tableNameToIdBuilder.put(tableNameStr, tableId); - tableIdToNameBuilder.put(tableId, tableNameStr); - } - } - tableNameToIdMap = tableNameToIdBuilder.build(); - tableIdToNameMap = tableIdToNameBuilder.build(); - } - - public Map<String,String> getNameToIdMap() { - return tableNameToIdMap; - } - - public Map<String,String> getIdtoNameMap() { - return tableIdToNameMap; - } -} diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/Tables.java b/core/src/main/java/org/apache/accumulo/core/client/impl/Tables.java index a93347c..fcf838f 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/Tables.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/Tables.java @@ -20,11 +20,12 @@ import static com.google.common.base.Preconditions.checkArgument; import static java.nio.charset.StandardCharsets.UTF_8; import java.security.SecurityPermission; +import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.concurrent.atomic.AtomicLong; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.Instance; @@ -36,49 +37,64 @@ import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.zookeeper.ZooUtil; import org.apache.accumulo.fate.zookeeper.ZooCache; import org.apache.accumulo.fate.zookeeper.ZooCacheFactory; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; - -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class Tables { + private static final Logger log = LoggerFactory.getLogger(Tables.class); public static final String VALID_NAME_REGEX = "^(\\w+\\.)?(\\w+)$"; private static final SecurityPermission TABLES_PERMISSION = new SecurityPermission("tablesPermission"); - // Per instance cache will expire after 10 minutes in case we encounter an instance not used frequently - private static Cache<String,TableMap> instanceToMapCache = CacheBuilder.newBuilder().expireAfterAccess(10, TimeUnit.MINUTES).build(); - private static Cache<String,ZooCache> instanceToZooCache = CacheBuilder.newBuilder().expireAfterAccess(10, TimeUnit.MINUTES).build(); + private static final AtomicLong cacheResetCount = new AtomicLong(0); - /** - * Return the cached ZooCache for provided instance. ZooCache is initially created with a watcher that will clear the TableMap cache for that instance when - * WatchedEvent occurs. - */ - private static ZooCache getZooCache(final Instance instance) { + private static ZooCache getZooCache(Instance instance) { SecurityManager sm = System.getSecurityManager(); if (sm != null) { sm.checkPermission(TABLES_PERMISSION); } - final String zks = instance.getZooKeepers(); - final int timeOut = instance.getZooKeepersSessionTimeOut(); - final String uuid = instance.getInstanceID(); + return new ZooCacheFactory().getZooCache(instance.getZooKeepers(), instance.getZooKeepersSessionTimeOut()); + } - try { - return instanceToZooCache.get(uuid, new Callable<ZooCache>() { - @Override - public ZooCache call() { - return new ZooCacheFactory().getZooCache(zks, timeOut, new Watcher() { - @Override - public void process(WatchedEvent watchedEvent) { - instanceToMapCache.invalidate(uuid); + private static SortedMap<String,String> getMap(Instance instance, boolean nameAsKey) { + ZooCache zc = getZooCache(instance); + + List<String> tableIds = zc.getChildren(ZooUtil.getRoot(instance) + Constants.ZTABLES); + TreeMap<String,String> tableMap = new TreeMap<>(); + Map<String,String> namespaceIdToNameMap = new HashMap<>(); + + for (String tableId : tableIds) { + byte[] tableName = zc.get(ZooUtil.getRoot(instance) + Constants.ZTABLES + "/" + tableId + Constants.ZTABLE_NAME); + byte[] nId = zc.get(ZooUtil.getRoot(instance) + Constants.ZTABLES + "/" + tableId + Constants.ZTABLE_NAMESPACE); + String namespaceName = Namespaces.DEFAULT_NAMESPACE; + // create fully qualified table name + if (nId == null) { + namespaceName = null; + } else { + String namespaceId = new String(nId, UTF_8); + if (!namespaceId.equals(Namespaces.DEFAULT_NAMESPACE_ID)) { + try { + namespaceName = namespaceIdToNameMap.get(namespaceId); + if (namespaceName == null) { + namespaceName = Namespaces.getNamespaceName(instance, namespaceId); + namespaceIdToNameMap.put(namespaceId, namespaceName); } - }); + } catch (NamespaceNotFoundException e) { + log.error("Table (" + tableId + ") contains reference to namespace (" + namespaceId + ") that doesn't exist", e); + continue; + } } - }); - } catch (ExecutionException e) { - throw new RuntimeException(e); + } + if (tableName != null && namespaceName != null) { + String tableNameStr = qualified(new String(tableName, UTF_8), namespaceName); + if (nameAsKey) + tableMap.put(tableNameStr, tableId); + else + tableMap.put(tableId, tableNameStr); + } } + + return tableMap; } public static String getTableId(Instance instance, String tableName) throws TableNotFoundException { @@ -113,31 +129,12 @@ public class Tables { return tableName; } - public static Map<String,String> getNameToIdMap(Instance instance) { - return getTableMap(instance).getNameToIdMap(); - } - - public static Map<String,String> getIdToNameMap(Instance instance) { - return getTableMap(instance).getIdtoNameMap(); + public static SortedMap<String,String> getNameToIdMap(Instance instance) { + return getMap(instance, true); } - /** - * Get the TableMap from the cache. A new one will be populated when needed. Cache is cleared manually by calling {@link #clearCache(Instance)} or - * automatically cleared by ZooCache watcher created in {@link #getZooCache(Instance)}. See ACCUMULO-4778. - */ - private static TableMap getTableMap(final Instance instance) { - TableMap map; - try { - map = instanceToMapCache.get(instance.getInstanceID(), new Callable<TableMap>() { - @Override - public TableMap call() { - return new TableMap(instance, getZooCache(instance)); - } - }); - } catch (ExecutionException e) { - throw new RuntimeException(e); - } - return map; + public static SortedMap<String,String> getIdToNameMap(Instance instance) { + return getMap(instance, false); } public static boolean exists(Instance instance, String tableId) { @@ -147,9 +144,9 @@ public class Tables { } public static void clearCache(Instance instance) { + cacheResetCount.incrementAndGet(); getZooCache(instance).clear(ZooUtil.getRoot(instance) + Constants.ZTABLES); getZooCache(instance).clear(ZooUtil.getRoot(instance) + Constants.ZNAMESPACES); - instanceToMapCache.invalidate(instance.getInstanceID()); } /** @@ -161,9 +158,17 @@ public class Tables { * A zookeeper path */ public static void clearCacheByPath(Instance instance, final String zooPath) { - String thePath = zooPath.startsWith("/") ? zooPath : "/" + zooPath; + + String thePath; + + if (zooPath.startsWith("/")) { + thePath = zooPath; + } else { + thePath = "/" + zooPath; + } + getZooCache(instance).clear(ZooUtil.getRoot(instance) + thePath); - instanceToMapCache.invalidate(instance.getInstanceID()); + } public static String getPrintableTableNameFromId(Map<String,String> tidToNameMap, String tableId) { @@ -224,6 +229,10 @@ public class Tables { } + public static long getCacheResetCount() { + return cacheResetCount.get(); + } + public static String qualified(String tableName) { return qualified(tableName, Namespaces.DEFAULT_NAMESPACE); } diff --git a/test/src/test/java/org/apache/accumulo/test/MultiTableBatchWriterIT.java b/test/src/test/java/org/apache/accumulo/test/MultiTableBatchWriterIT.java index fa5d8bb..f9720f0 100644 --- a/test/src/test/java/org/apache/accumulo/test/MultiTableBatchWriterIT.java +++ b/test/src/test/java/org/apache/accumulo/test/MultiTableBatchWriterIT.java @@ -20,6 +20,7 @@ import java.util.Arrays; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; +import java.util.concurrent.TimeUnit; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; @@ -30,6 +31,7 @@ import org.apache.accumulo.core.client.MultiTableBatchWriter; import org.apache.accumulo.core.client.MutationsRejectedException; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.client.TableOfflineException; import org.apache.accumulo.core.client.admin.TableOperations; import org.apache.accumulo.core.client.impl.ClientContext; import org.apache.accumulo.core.client.impl.Credentials; @@ -59,12 +61,12 @@ public class MultiTableBatchWriterIT extends AccumuloClusterIT { @Before public void setUpArgs() throws AccumuloException, AccumuloSecurityException { connector = getConnector(); - mtbw = getMultiTableBatchWriter(); + mtbw = getMultiTableBatchWriter(60); } - public MultiTableBatchWriter getMultiTableBatchWriter() { + public MultiTableBatchWriter getMultiTableBatchWriter(long cacheTimeoutInSeconds) { ClientContext context = new ClientContext(connector.getInstance(), new Credentials(getAdminPrincipal(), getAdminToken()), getCluster().getClientConfig()); - return new MultiTableBatchWriterImpl(context, new BatchWriterConfig()); + return new MultiTableBatchWriterImpl(context, new BatchWriterConfig(), cacheTimeoutInSeconds, TimeUnit.SECONDS); } @Test @@ -263,7 +265,7 @@ public class MultiTableBatchWriterIT extends AccumuloClusterIT { @Test public void testTableRenameNewWritersNoCaching() throws Exception { - mtbw = getMultiTableBatchWriter(); + mtbw = getMultiTableBatchWriter(0); try { final String[] names = getUniqueNames(4); @@ -404,4 +406,113 @@ public class MultiTableBatchWriterIT extends AccumuloClusterIT { Assert.assertTrue("Expected mutations to be rejected.", mutationsRejected); } + + @Test + public void testOfflineTableWithCache() throws Exception { + boolean mutationsRejected = false; + + try { + final String[] names = getUniqueNames(2); + final String table1 = names[0], table2 = names[1]; + + TableOperations tops = connector.tableOperations(); + tops.create(table1); + tops.create(table2); + + BatchWriter bw1 = mtbw.getBatchWriter(table1), bw2 = mtbw.getBatchWriter(table2); + + Mutation m1 = new Mutation("foo"); + m1.put("col1", "", "val1"); + m1.put("col2", "", "val2"); + + bw1.addMutation(m1); + bw2.addMutation(m1); + + tops.offline(table1); + + try { + bw1 = mtbw.getBatchWriter(table1); + } catch (TableOfflineException e) { + // pass + mutationsRejected = true; + } + + tops.offline(table2); + + try { + bw2 = mtbw.getBatchWriter(table2); + } catch (TableOfflineException e) { + // pass + mutationsRejected = true; + } + } finally { + if (null != mtbw) { + try { + // Mutations might have flushed before the table offline occurred + mtbw.close(); + } catch (MutationsRejectedException e) { + // Pass + mutationsRejected = true; + } + } + } + + Assert.assertTrue("Expected mutations to be rejected.", mutationsRejected); + } + + @Test + public void testOfflineTableWithoutCache() throws Exception { + mtbw = getMultiTableBatchWriter(0); + boolean mutationsRejected = false; + + try { + final String[] names = getUniqueNames(2); + final String table1 = names[0], table2 = names[1]; + + TableOperations tops = connector.tableOperations(); + tops.create(table1); + tops.create(table2); + + BatchWriter bw1 = mtbw.getBatchWriter(table1), bw2 = mtbw.getBatchWriter(table2); + + Mutation m1 = new Mutation("foo"); + m1.put("col1", "", "val1"); + m1.put("col2", "", "val2"); + + bw1.addMutation(m1); + bw2.addMutation(m1); + + // Mutations might or might not flush before tables goes offline + tops.offline(table1); + tops.offline(table2); + + try { + bw1 = mtbw.getBatchWriter(table1); + Assert.fail(table1 + " should be offline"); + } catch (TableOfflineException e) { + // pass + mutationsRejected = true; + } + + try { + bw2 = mtbw.getBatchWriter(table2); + Assert.fail(table1 + " should be offline"); + } catch (TableOfflineException e) { + // pass + mutationsRejected = true; + } + } finally { + if (null != mtbw) { + try { + // Mutations might have flushed before the table offline occurred + mtbw.close(); + } catch (MutationsRejectedException e) { + // Pass + mutationsRejected = true; + } + } + } + + Assert.assertTrue("Expected mutations to be rejected.", mutationsRejected); + } } -- To stop receiving notification emails like this one, please contact ktur...@apache.org.