This is an automated email from the ASF dual-hosted git repository. mmiller pushed a commit to branch 1.7 in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/1.7 by this push: new 5adeb4b ACCUMULO-4778 Cache table name to id map (#364) 5adeb4b is described below commit 5adeb4b7ed561a0bcea1a1def17835310831662f Author: Mike Miller <mmil...@apache.org> AuthorDate: Tue Jan 30 16:21:13 2018 -0500 ACCUMULO-4778 Cache table name to id map (#364) * Improved performance anytime tableIdMap is accessed (by the API or internally) * New class TableMap is cached per instance using Guava Cache * Added watcher on Tables ZooCache that will refresh the TableMap on any ZK table updates * Removed now obsolete internal cache from MultiTableBatchWriter --- .../client/impl/MultiTableBatchWriterImpl.java | 79 +------------ .../apache/accumulo/core/client/impl/TableMap.java | 100 +++++++++++++++++ .../apache/accumulo/core/client/impl/Tables.java | 123 ++++++++++----------- .../accumulo/test/MultiTableBatchWriterIT.java | 119 +------------------- 4 files changed, 162 insertions(+), 259 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/MultiTableBatchWriterImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/MultiTableBatchWriterImpl.java index f5e1fa0..e7a6d73 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/MultiTableBatchWriterImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/MultiTableBatchWriterImpl.java @@ -19,37 +19,26 @@ package org.apache.accumulo.core.client.impl; import static com.google.common.base.Preconditions.checkArgument; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.BatchWriterConfig; -import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.client.MultiTableBatchWriter; import org.apache.accumulo.core.client.MutationsRejectedException; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.TableOfflineException; import org.apache.accumulo.core.data.Mutation; -import org.apache.accumulo.core.master.state.tables.TableState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; import com.google.common.util.concurrent.UncheckedExecutionException; public class MultiTableBatchWriterImpl implements MultiTableBatchWriter { - public static final long DEFAULT_CACHE_TIME = 200; - public static final TimeUnit DEFAULT_CACHE_TIME_UNIT = TimeUnit.MILLISECONDS; private static final Logger log = LoggerFactory.getLogger(MultiTableBatchWriterImpl.class); private AtomicBoolean closed; - private AtomicLong cacheLastState; private class TableBatchWriter implements BatchWriter { @@ -82,49 +71,17 @@ public class MultiTableBatchWriterImpl implements MultiTableBatchWriter { } - /** - * CacheLoader which will look up the internal table ID for a given table name. - */ - private class TableNameToIdLoader extends CacheLoader<String,String> { - - @Override - public String load(String tableName) throws Exception { - Instance instance = context.getInstance(); - String tableId = Tables.getNameToIdMap(instance).get(tableName); - - if (tableId == null) - throw new TableNotFoundException(null, tableName, null); - - if (Tables.getTableState(instance, tableId) == TableState.OFFLINE) - throw new TableOfflineException(instance, tableId); - - return tableId; - } - - } - private TabletServerBatchWriter bw; private ConcurrentHashMap<String,BatchWriter> tableWriters; private final ClientContext context; - private final LoadingCache<String,String> nameToIdCache; public MultiTableBatchWriterImpl(ClientContext context, BatchWriterConfig config) { - this(context, config, DEFAULT_CACHE_TIME, DEFAULT_CACHE_TIME_UNIT); - } - - public MultiTableBatchWriterImpl(ClientContext context, BatchWriterConfig config, long cacheTime, TimeUnit cacheTimeUnit) { checkArgument(context != null, "context is null"); checkArgument(config != null, "config is null"); - checkArgument(cacheTimeUnit != null, "cacheTimeUnit is null"); this.context = context; this.bw = new TabletServerBatchWriter(context, config); tableWriters = new ConcurrentHashMap<>(); this.closed = new AtomicBoolean(false); - this.cacheLastState = new AtomicLong(0); - - // Potentially up to ~500k used to cache names to IDs with "segments" of (maybe) ~1000 entries - nameToIdCache = CacheBuilder.newBuilder().expireAfterWrite(cacheTime, cacheTimeUnit).concurrencyLevel(10).maximumSize(10000).initialCapacity(20) - .build(new TableNameToIdLoader()); } @Override @@ -161,7 +118,7 @@ public class MultiTableBatchWriterImpl implements MultiTableBatchWriter { */ private String getId(String tableName) throws TableNotFoundException { try { - return nameToIdCache.get(tableName); + return Tables.getTableId(context.inst, tableName); } catch (UncheckedExecutionException e) { Throwable cause = e.getCause(); @@ -176,20 +133,6 @@ public class MultiTableBatchWriterImpl implements MultiTableBatchWriter { } throw e; - } catch (ExecutionException e) { - Throwable cause = e.getCause(); - - log.error("Unexpected exception when fetching table id for " + tableName); - - if (null == cause) { - throw new RuntimeException(e); - } else if (cause instanceof TableNotFoundException) { - throw (TableNotFoundException) cause; - } else if (cause instanceof TableOfflineException) { - throw (TableOfflineException) cause; - } - - throw new RuntimeException(e); } } @@ -197,26 +140,6 @@ public class MultiTableBatchWriterImpl implements MultiTableBatchWriter { public BatchWriter getBatchWriter(String tableName) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { checkArgument(tableName != null, "tableName is null"); - while (true) { - long cacheResetCount = Tables.getCacheResetCount(); - - // cacheResetCount could change after this point in time, but I think thats ok because just want to ensure this methods sees changes - // made before it was called. - - long internalResetCount = cacheLastState.get(); - - if (cacheResetCount > internalResetCount) { - if (!cacheLastState.compareAndSet(internalResetCount, cacheResetCount)) { - continue; // concurrent operation, lets not possibly move cacheLastState backwards in the case where a thread pauses for along time - } - - nameToIdCache.invalidateAll(); - break; - } - - break; - } - String tableId = getId(tableName); BatchWriter tbw = tableWriters.get(tableId); diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/TableMap.java b/core/src/main/java/org/apache/accumulo/core/client/impl/TableMap.java new file mode 100644 index 0000000..3f3d90c --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/TableMap.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.client.impl; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.apache.accumulo.core.client.impl.Tables.qualified; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.client.NamespaceNotFoundException; +import org.apache.accumulo.core.zookeeper.ZooUtil; +import org.apache.accumulo.fate.zookeeper.ZooCache; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.ImmutableMap; + +/** + * Used for thread safe caching of immutable table ID maps. See ACCUMULO-4778. + */ +public class TableMap { + private static final Logger log = LoggerFactory.getLogger(TableMap.class); + + private final Map<String,String> tableNameToIdMap; + private final Map<String,String> tableIdToNameMap; + + public TableMap(Instance instance, ZooCache zooCache) { + List<String> tableIds = zooCache.getChildren(ZooUtil.getRoot(instance) + Constants.ZTABLES); + Map<String,String> namespaceIdToNameMap = new HashMap<>(); + ImmutableMap.Builder<String,String> tableNameToIdBuilder = new ImmutableMap.Builder<>(); + ImmutableMap.Builder<String,String> tableIdToNameBuilder = new ImmutableMap.Builder<>(); + // use StringBuilder to construct zPath string efficiently across many tables + StringBuilder zPathBuilder = new StringBuilder(); + zPathBuilder.append(ZooUtil.getRoot(instance)).append(Constants.ZTABLES).append("/"); + int prefixLength = zPathBuilder.length(); + + for (String tableId : tableIds) { + // reset StringBuilder to prefix length before appending ID and suffix + zPathBuilder.setLength(prefixLength); + zPathBuilder.append(tableId).append(Constants.ZTABLE_NAME); + byte[] tableName = zooCache.get(zPathBuilder.toString()); + zPathBuilder.setLength(prefixLength); + zPathBuilder.append(tableId).append(Constants.ZTABLE_NAMESPACE); + byte[] nId = zooCache.get(zPathBuilder.toString()); + + String namespaceName = Namespaces.DEFAULT_NAMESPACE; + // create fully qualified table name + if (nId == null) { + namespaceName = null; + } else { + String namespaceId = new String(nId, UTF_8); + if (!namespaceId.equals(Namespaces.DEFAULT_NAMESPACE_ID)) { + try { + namespaceName = namespaceIdToNameMap.get(namespaceId); + if (namespaceName == null) { + namespaceName = Namespaces.getNamespaceName(instance, namespaceId); + namespaceIdToNameMap.put(namespaceId, namespaceName); + } + } catch (NamespaceNotFoundException e) { + log.error("Table (" + tableId + ") contains reference to namespace (" + namespaceId + ") that doesn't exist", e); + continue; + } + } + } + if (tableName != null && namespaceName != null) { + String tableNameStr = qualified(new String(tableName, UTF_8), namespaceName); + tableNameToIdBuilder.put(tableNameStr, tableId); + tableIdToNameBuilder.put(tableId, tableNameStr); + } + } + tableNameToIdMap = tableNameToIdBuilder.build(); + tableIdToNameMap = tableIdToNameBuilder.build(); + } + + public Map<String,String> getNameToIdMap() { + return tableNameToIdMap; + } + + public Map<String,String> getIdtoNameMap() { + return tableIdToNameMap; + } +} diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/Tables.java b/core/src/main/java/org/apache/accumulo/core/client/impl/Tables.java index fcf838f..a93347c 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/Tables.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/Tables.java @@ -20,12 +20,11 @@ import static com.google.common.base.Preconditions.checkArgument; import static java.nio.charset.StandardCharsets.UTF_8; import java.security.SecurityPermission; -import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.SortedMap; -import java.util.TreeMap; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.Instance; @@ -37,64 +36,49 @@ import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.zookeeper.ZooUtil; import org.apache.accumulo.fate.zookeeper.ZooCache; import org.apache.accumulo.fate.zookeeper.ZooCacheFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; public class Tables { - private static final Logger log = LoggerFactory.getLogger(Tables.class); public static final String VALID_NAME_REGEX = "^(\\w+\\.)?(\\w+)$"; private static final SecurityPermission TABLES_PERMISSION = new SecurityPermission("tablesPermission"); - private static final AtomicLong cacheResetCount = new AtomicLong(0); + // Per instance cache will expire after 10 minutes in case we encounter an instance not used frequently + private static Cache<String,TableMap> instanceToMapCache = CacheBuilder.newBuilder().expireAfterAccess(10, TimeUnit.MINUTES).build(); + private static Cache<String,ZooCache> instanceToZooCache = CacheBuilder.newBuilder().expireAfterAccess(10, TimeUnit.MINUTES).build(); - private static ZooCache getZooCache(Instance instance) { + /** + * Return the cached ZooCache for provided instance. ZooCache is initially created with a watcher that will clear the TableMap cache for that instance when + * WatchedEvent occurs. + */ + private static ZooCache getZooCache(final Instance instance) { SecurityManager sm = System.getSecurityManager(); if (sm != null) { sm.checkPermission(TABLES_PERMISSION); } - return new ZooCacheFactory().getZooCache(instance.getZooKeepers(), instance.getZooKeepersSessionTimeOut()); - } + final String zks = instance.getZooKeepers(); + final int timeOut = instance.getZooKeepersSessionTimeOut(); + final String uuid = instance.getInstanceID(); - private static SortedMap<String,String> getMap(Instance instance, boolean nameAsKey) { - ZooCache zc = getZooCache(instance); - - List<String> tableIds = zc.getChildren(ZooUtil.getRoot(instance) + Constants.ZTABLES); - TreeMap<String,String> tableMap = new TreeMap<>(); - Map<String,String> namespaceIdToNameMap = new HashMap<>(); - - for (String tableId : tableIds) { - byte[] tableName = zc.get(ZooUtil.getRoot(instance) + Constants.ZTABLES + "/" + tableId + Constants.ZTABLE_NAME); - byte[] nId = zc.get(ZooUtil.getRoot(instance) + Constants.ZTABLES + "/" + tableId + Constants.ZTABLE_NAMESPACE); - String namespaceName = Namespaces.DEFAULT_NAMESPACE; - // create fully qualified table name - if (nId == null) { - namespaceName = null; - } else { - String namespaceId = new String(nId, UTF_8); - if (!namespaceId.equals(Namespaces.DEFAULT_NAMESPACE_ID)) { - try { - namespaceName = namespaceIdToNameMap.get(namespaceId); - if (namespaceName == null) { - namespaceName = Namespaces.getNamespaceName(instance, namespaceId); - namespaceIdToNameMap.put(namespaceId, namespaceName); + try { + return instanceToZooCache.get(uuid, new Callable<ZooCache>() { + @Override + public ZooCache call() { + return new ZooCacheFactory().getZooCache(zks, timeOut, new Watcher() { + @Override + public void process(WatchedEvent watchedEvent) { + instanceToMapCache.invalidate(uuid); } - } catch (NamespaceNotFoundException e) { - log.error("Table (" + tableId + ") contains reference to namespace (" + namespaceId + ") that doesn't exist", e); - continue; - } + }); } - } - if (tableName != null && namespaceName != null) { - String tableNameStr = qualified(new String(tableName, UTF_8), namespaceName); - if (nameAsKey) - tableMap.put(tableNameStr, tableId); - else - tableMap.put(tableId, tableNameStr); - } + }); + } catch (ExecutionException e) { + throw new RuntimeException(e); } - - return tableMap; } public static String getTableId(Instance instance, String tableName) throws TableNotFoundException { @@ -129,12 +113,31 @@ public class Tables { return tableName; } - public static SortedMap<String,String> getNameToIdMap(Instance instance) { - return getMap(instance, true); + public static Map<String,String> getNameToIdMap(Instance instance) { + return getTableMap(instance).getNameToIdMap(); + } + + public static Map<String,String> getIdToNameMap(Instance instance) { + return getTableMap(instance).getIdtoNameMap(); } - public static SortedMap<String,String> getIdToNameMap(Instance instance) { - return getMap(instance, false); + /** + * Get the TableMap from the cache. A new one will be populated when needed. Cache is cleared manually by calling {@link #clearCache(Instance)} or + * automatically cleared by ZooCache watcher created in {@link #getZooCache(Instance)}. See ACCUMULO-4778. + */ + private static TableMap getTableMap(final Instance instance) { + TableMap map; + try { + map = instanceToMapCache.get(instance.getInstanceID(), new Callable<TableMap>() { + @Override + public TableMap call() { + return new TableMap(instance, getZooCache(instance)); + } + }); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } + return map; } public static boolean exists(Instance instance, String tableId) { @@ -144,9 +147,9 @@ public class Tables { } public static void clearCache(Instance instance) { - cacheResetCount.incrementAndGet(); getZooCache(instance).clear(ZooUtil.getRoot(instance) + Constants.ZTABLES); getZooCache(instance).clear(ZooUtil.getRoot(instance) + Constants.ZNAMESPACES); + instanceToMapCache.invalidate(instance.getInstanceID()); } /** @@ -158,17 +161,9 @@ public class Tables { * A zookeeper path */ public static void clearCacheByPath(Instance instance, final String zooPath) { - - String thePath; - - if (zooPath.startsWith("/")) { - thePath = zooPath; - } else { - thePath = "/" + zooPath; - } - + String thePath = zooPath.startsWith("/") ? zooPath : "/" + zooPath; getZooCache(instance).clear(ZooUtil.getRoot(instance) + thePath); - + instanceToMapCache.invalidate(instance.getInstanceID()); } public static String getPrintableTableNameFromId(Map<String,String> tidToNameMap, String tableId) { @@ -229,10 +224,6 @@ public class Tables { } - public static long getCacheResetCount() { - return cacheResetCount.get(); - } - public static String qualified(String tableName) { return qualified(tableName, Namespaces.DEFAULT_NAMESPACE); } diff --git a/test/src/test/java/org/apache/accumulo/test/MultiTableBatchWriterIT.java b/test/src/test/java/org/apache/accumulo/test/MultiTableBatchWriterIT.java index f9720f0..fa5d8bb 100644 --- a/test/src/test/java/org/apache/accumulo/test/MultiTableBatchWriterIT.java +++ b/test/src/test/java/org/apache/accumulo/test/MultiTableBatchWriterIT.java @@ -20,7 +20,6 @@ import java.util.Arrays; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; -import java.util.concurrent.TimeUnit; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; @@ -31,7 +30,6 @@ import org.apache.accumulo.core.client.MultiTableBatchWriter; import org.apache.accumulo.core.client.MutationsRejectedException; import org.apache.accumulo.core.client.Scanner; import org.apache.accumulo.core.client.TableNotFoundException; -import org.apache.accumulo.core.client.TableOfflineException; import org.apache.accumulo.core.client.admin.TableOperations; import org.apache.accumulo.core.client.impl.ClientContext; import org.apache.accumulo.core.client.impl.Credentials; @@ -61,12 +59,12 @@ public class MultiTableBatchWriterIT extends AccumuloClusterIT { @Before public void setUpArgs() throws AccumuloException, AccumuloSecurityException { connector = getConnector(); - mtbw = getMultiTableBatchWriter(60); + mtbw = getMultiTableBatchWriter(); } - public MultiTableBatchWriter getMultiTableBatchWriter(long cacheTimeoutInSeconds) { + public MultiTableBatchWriter getMultiTableBatchWriter() { ClientContext context = new ClientContext(connector.getInstance(), new Credentials(getAdminPrincipal(), getAdminToken()), getCluster().getClientConfig()); - return new MultiTableBatchWriterImpl(context, new BatchWriterConfig(), cacheTimeoutInSeconds, TimeUnit.SECONDS); + return new MultiTableBatchWriterImpl(context, new BatchWriterConfig()); } @Test @@ -265,7 +263,7 @@ public class MultiTableBatchWriterIT extends AccumuloClusterIT { @Test public void testTableRenameNewWritersNoCaching() throws Exception { - mtbw = getMultiTableBatchWriter(0); + mtbw = getMultiTableBatchWriter(); try { final String[] names = getUniqueNames(4); @@ -406,113 +404,4 @@ public class MultiTableBatchWriterIT extends AccumuloClusterIT { Assert.assertTrue("Expected mutations to be rejected.", mutationsRejected); } - - @Test - public void testOfflineTableWithCache() throws Exception { - boolean mutationsRejected = false; - - try { - final String[] names = getUniqueNames(2); - final String table1 = names[0], table2 = names[1]; - - TableOperations tops = connector.tableOperations(); - tops.create(table1); - tops.create(table2); - - BatchWriter bw1 = mtbw.getBatchWriter(table1), bw2 = mtbw.getBatchWriter(table2); - - Mutation m1 = new Mutation("foo"); - m1.put("col1", "", "val1"); - m1.put("col2", "", "val2"); - - bw1.addMutation(m1); - bw2.addMutation(m1); - - tops.offline(table1); - - try { - bw1 = mtbw.getBatchWriter(table1); - } catch (TableOfflineException e) { - // pass - mutationsRejected = true; - } - - tops.offline(table2); - - try { - bw2 = mtbw.getBatchWriter(table2); - } catch (TableOfflineException e) { - // pass - mutationsRejected = true; - } - } finally { - if (null != mtbw) { - try { - // Mutations might have flushed before the table offline occurred - mtbw.close(); - } catch (MutationsRejectedException e) { - // Pass - mutationsRejected = true; - } - } - } - - Assert.assertTrue("Expected mutations to be rejected.", mutationsRejected); - } - - @Test - public void testOfflineTableWithoutCache() throws Exception { - mtbw = getMultiTableBatchWriter(0); - boolean mutationsRejected = false; - - try { - final String[] names = getUniqueNames(2); - final String table1 = names[0], table2 = names[1]; - - TableOperations tops = connector.tableOperations(); - tops.create(table1); - tops.create(table2); - - BatchWriter bw1 = mtbw.getBatchWriter(table1), bw2 = mtbw.getBatchWriter(table2); - - Mutation m1 = new Mutation("foo"); - m1.put("col1", "", "val1"); - m1.put("col2", "", "val2"); - - bw1.addMutation(m1); - bw2.addMutation(m1); - - // Mutations might or might not flush before tables goes offline - tops.offline(table1); - tops.offline(table2); - - try { - bw1 = mtbw.getBatchWriter(table1); - Assert.fail(table1 + " should be offline"); - } catch (TableOfflineException e) { - // pass - mutationsRejected = true; - } - - try { - bw2 = mtbw.getBatchWriter(table2); - Assert.fail(table1 + " should be offline"); - } catch (TableOfflineException e) { - // pass - mutationsRejected = true; - } - } finally { - if (null != mtbw) { - try { - // Mutations might have flushed before the table offline occurred - mtbw.close(); - } catch (MutationsRejectedException e) { - // Pass - mutationsRejected = true; - } - } - } - - Assert.assertTrue("Expected mutations to be rejected.", mutationsRejected); - } } -- To stop receiving notification emails like this one, please contact mmil...@apache.org.