This is an automated email from the ASF dual-hosted git repository. mmiller pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 0cb58c519cb058d6a1029f2fe39f6071ca6b5ab8 Merge: 19f819d d6d8a7d Author: Mike Miller <mmil...@apache.org> AuthorDate: Tue Jan 30 19:02:46 2018 -0500 Merge branch '1.8' Conflicts: core/src/main/java/org/apache/accumulo/core/client/impl/MultiTableBatchWriterImpl.java core/src/main/java/org/apache/accumulo/core/client/impl/Tables.java .../client/impl/MultiTableBatchWriterImpl.java | 76 +-------- .../apache/accumulo/core/client/impl/TableMap.java | 101 ++++++++++++ .../apache/accumulo/core/client/impl/Tables.java | 174 ++++++++------------- .../server/util/VerifyTabletAssignments.java | 2 +- .../org/apache/accumulo/master/tableOps/Utils.java | 2 +- .../accumulo/test/MultiTableBatchWriterIT.java | 119 +------------- 6 files changed, 173 insertions(+), 301 deletions(-) diff --cc core/src/main/java/org/apache/accumulo/core/client/impl/MultiTableBatchWriterImpl.java index aa0d469,a4a5b2f..255aa01 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/MultiTableBatchWriterImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/MultiTableBatchWriterImpl.java @@@ -82,37 -71,13 +71,13 @@@ public class MultiTableBatchWriterImpl } - /** - * CacheLoader which will look up the internal table ID for a given table name. - */ - private class TableNameToIdLoader extends CacheLoader<String,Table.ID> { - - @Override - public Table.ID load(String tableName) throws Exception { - Instance instance = context.getInstance(); - Table.ID tableId = Tables.getTableId(instance, tableName); - - if (Tables.getTableState(instance, tableId) == TableState.OFFLINE) - throw new TableOfflineException(instance, tableId.canonicalID()); - - return tableId; - } - - } - private TabletServerBatchWriter bw; - private ConcurrentHashMap<String,BatchWriter> tableWriters; + private ConcurrentHashMap<Table.ID,BatchWriter> tableWriters; private final ClientContext context; - private final LoadingCache<String,Table.ID> nameToIdCache; public MultiTableBatchWriterImpl(ClientContext context, BatchWriterConfig config) { - this(context, config, DEFAULT_CACHE_TIME, DEFAULT_CACHE_TIME_UNIT); - } - - public MultiTableBatchWriterImpl(ClientContext context, BatchWriterConfig config, long cacheTime, TimeUnit cacheTimeUnit) { checkArgument(context != null, "context is null"); checkArgument(config != null, "config is null"); - checkArgument(cacheTimeUnit != null, "cacheTimeUnit is null"); this.context = context; this.bw = new TabletServerBatchWriter(context, config); tableWriters = new ConcurrentHashMap<>(); @@@ -156,9 -116,9 +116,9 @@@ * The name of the table which to find the ID for * @return The table ID, or null if the table name doesn't exist */ - private String getId(String tableName) throws TableNotFoundException { + private Table.ID getId(String tableName) throws TableNotFoundException { try { - return nameToIdCache.get(tableName); + return Tables.getTableId(context.inst, tableName); } catch (UncheckedExecutionException e) { Throwable cause = e.getCause(); @@@ -194,27 -140,7 +140,7 @@@ public BatchWriter getBatchWriter(String tableName) throws AccumuloException, AccumuloSecurityException, TableNotFoundException { checkArgument(tableName != null, "tableName is null"); - while (true) { - long cacheResetCount = Tables.getCacheResetCount(); - - // cacheResetCount could change after this point in time, but I think thats ok because just want to ensure this methods sees changes - // made before it was called. - - long internalResetCount = cacheLastState.get(); - - if (cacheResetCount > internalResetCount) { - if (!cacheLastState.compareAndSet(internalResetCount, cacheResetCount)) { - continue; // concurrent operation, lets not possibly move cacheLastState backwards in the case where a thread pauses for along time - } - - nameToIdCache.invalidateAll(); - break; - } - - break; - } - - String tableId = getId(tableName); + Table.ID tableId = getId(tableName); BatchWriter tbw = tableWriters.get(tableId); if (tbw == null) { diff --cc core/src/main/java/org/apache/accumulo/core/client/impl/TableMap.java index 0000000,3f3d90c..9f17fde mode 000000,100644..100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/TableMap.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/TableMap.java @@@ -1,0 -1,100 +1,101 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.accumulo.core.client.impl; + + import static java.nio.charset.StandardCharsets.UTF_8; + import static org.apache.accumulo.core.client.impl.Tables.qualified; + + import java.util.HashMap; + import java.util.List; + import java.util.Map; + + import org.apache.accumulo.core.Constants; + import org.apache.accumulo.core.client.Instance; + import org.apache.accumulo.core.client.NamespaceNotFoundException; + import org.apache.accumulo.core.zookeeper.ZooUtil; + import org.apache.accumulo.fate.zookeeper.ZooCache; + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + + import com.google.common.collect.ImmutableMap; + + /** + * Used for thread safe caching of immutable table ID maps. See ACCUMULO-4778. + */ + public class TableMap { + private static final Logger log = LoggerFactory.getLogger(TableMap.class); + - private final Map<String,String> tableNameToIdMap; - private final Map<String,String> tableIdToNameMap; ++ private final Map<String,Table.ID> tableNameToIdMap; ++ private final Map<Table.ID,String> tableIdToNameMap; + + public TableMap(Instance instance, ZooCache zooCache) { + List<String> tableIds = zooCache.getChildren(ZooUtil.getRoot(instance) + Constants.ZTABLES); - Map<String,String> namespaceIdToNameMap = new HashMap<>(); - ImmutableMap.Builder<String,String> tableNameToIdBuilder = new ImmutableMap.Builder<>(); - ImmutableMap.Builder<String,String> tableIdToNameBuilder = new ImmutableMap.Builder<>(); ++ Map<Namespace.ID,String> namespaceIdToNameMap = new HashMap<>(); ++ ImmutableMap.Builder<String,Table.ID> tableNameToIdBuilder = new ImmutableMap.Builder<>(); ++ ImmutableMap.Builder<Table.ID,String> tableIdToNameBuilder = new ImmutableMap.Builder<>(); + // use StringBuilder to construct zPath string efficiently across many tables + StringBuilder zPathBuilder = new StringBuilder(); + zPathBuilder.append(ZooUtil.getRoot(instance)).append(Constants.ZTABLES).append("/"); + int prefixLength = zPathBuilder.length(); + - for (String tableId : tableIds) { ++ for (String tableIdStr : tableIds) { + // reset StringBuilder to prefix length before appending ID and suffix + zPathBuilder.setLength(prefixLength); - zPathBuilder.append(tableId).append(Constants.ZTABLE_NAME); ++ zPathBuilder.append(tableIdStr).append(Constants.ZTABLE_NAME); + byte[] tableName = zooCache.get(zPathBuilder.toString()); + zPathBuilder.setLength(prefixLength); - zPathBuilder.append(tableId).append(Constants.ZTABLE_NAMESPACE); ++ zPathBuilder.append(tableIdStr).append(Constants.ZTABLE_NAMESPACE); + byte[] nId = zooCache.get(zPathBuilder.toString()); + - String namespaceName = Namespaces.DEFAULT_NAMESPACE; ++ String namespaceName = Namespace.DEFAULT; + // create fully qualified table name + if (nId == null) { + namespaceName = null; + } else { - String namespaceId = new String(nId, UTF_8); - if (!namespaceId.equals(Namespaces.DEFAULT_NAMESPACE_ID)) { ++ Namespace.ID namespaceId = Namespace.ID.of(new String(nId, UTF_8)); ++ if (!namespaceId.equals(Namespace.ID.DEFAULT)) { + try { + namespaceName = namespaceIdToNameMap.get(namespaceId); + if (namespaceName == null) { + namespaceName = Namespaces.getNamespaceName(instance, namespaceId); + namespaceIdToNameMap.put(namespaceId, namespaceName); + } + } catch (NamespaceNotFoundException e) { - log.error("Table (" + tableId + ") contains reference to namespace (" + namespaceId + ") that doesn't exist", e); ++ log.error("Table (" + tableIdStr + ") contains reference to namespace (" + namespaceId + ") that doesn't exist", e); + continue; + } + } + } + if (tableName != null && namespaceName != null) { + String tableNameStr = qualified(new String(tableName, UTF_8), namespaceName); ++ Table.ID tableId = Table.ID.of(tableIdStr); + tableNameToIdBuilder.put(tableNameStr, tableId); + tableIdToNameBuilder.put(tableId, tableNameStr); + } + } + tableNameToIdMap = tableNameToIdBuilder.build(); + tableIdToNameMap = tableIdToNameBuilder.build(); + } + - public Map<String,String> getNameToIdMap() { ++ public Map<String,Table.ID> getNameToIdMap() { + return tableNameToIdMap; + } + - public Map<String,String> getIdtoNameMap() { ++ public Map<Table.ID,String> getIdtoNameMap() { + return tableIdToNameMap; + } + } diff --cc core/src/main/java/org/apache/accumulo/core/client/impl/Tables.java index 512c875,a93347c..94bd193 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/Tables.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/Tables.java @@@ -19,14 -19,12 +19,11 @@@ package org.apache.accumulo.core.client import static com.google.common.base.Preconditions.checkArgument; import static java.nio.charset.StandardCharsets.UTF_8; - import java.util.ArrayList; - import java.util.HashMap; + import java.security.SecurityPermission; import java.util.List; import java.util.Map; - import java.util.SortedMap; - import java.util.TreeMap; - import java.util.concurrent.atomic.AtomicLong; - import java.util.function.BiConsumer; -import java.util.concurrent.Callable; + import java.util.concurrent.ExecutionException; + import java.util.concurrent.TimeUnit; import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.Instance; @@@ -38,37 -36,61 +35,57 @@@ import org.apache.accumulo.core.util.Pa import org.apache.accumulo.core.zookeeper.ZooUtil; import org.apache.accumulo.fate.zookeeper.ZooCache; import org.apache.accumulo.fate.zookeeper.ZooCacheFactory; - import org.slf4j.Logger; - import org.slf4j.LoggerFactory; -import org.apache.zookeeper.WatchedEvent; -import org.apache.zookeeper.Watcher; + + import com.google.common.cache.Cache; + import com.google.common.cache.CacheBuilder; public class Tables { - private static final Logger log = LoggerFactory.getLogger(Tables.class); public static final String VALID_NAME_REGEX = "^(\\w+\\.)?(\\w+)$"; - private static final AtomicLong cacheResetCount = new AtomicLong(0); - - private static ZooCache getZooCache(Instance instance) { - return new ZooCacheFactory().getZooCache(instance.getZooKeepers(), instance.getZooKeepersSessionTimeOut()); - } + private static final SecurityPermission TABLES_PERMISSION = new SecurityPermission("tablesPermission"); + // Per instance cache will expire after 10 minutes in case we encounter an instance not used frequently + private static Cache<String,TableMap> instanceToMapCache = CacheBuilder.newBuilder().expireAfterAccess(10, TimeUnit.MINUTES).build(); + private static Cache<String,ZooCache> instanceToZooCache = CacheBuilder.newBuilder().expireAfterAccess(10, TimeUnit.MINUTES).build(); /** + * Lookup table ID in ZK. Throw TableNotFoundException if not found. Also wraps NamespaceNotFoundException in TableNotFoundException if namespace is not + * found. + */ + public static Table.ID getTableId(Instance instance, String tableName) throws TableNotFoundException { + try { + return _getTableId(instance, tableName); + } catch (NamespaceNotFoundException e) { + throw new TableNotFoundException(tableName, e); + } + } + + /** + * Return the cached ZooCache for provided instance. ZooCache is initially created with a watcher that will clear the TableMap cache for that instance when + * WatchedEvent occurs. + */ + private static ZooCache getZooCache(final Instance instance) { + SecurityManager sm = System.getSecurityManager(); + if (sm != null) { + sm.checkPermission(TABLES_PERMISSION); + } + final String zks = instance.getZooKeepers(); + final int timeOut = instance.getZooKeepersSessionTimeOut(); + final String uuid = instance.getInstanceID(); + + try { - return instanceToZooCache.get(uuid, new Callable<ZooCache>() { - @Override - public ZooCache call() { - return new ZooCacheFactory().getZooCache(zks, timeOut, new Watcher() { - @Override - public void process(WatchedEvent watchedEvent) { - instanceToMapCache.invalidate(uuid); - } - }); - } - }); ++ return instanceToZooCache.get(uuid, () -> new ZooCacheFactory().getZooCache(zks, timeOut, ++ watchedEvent -> instanceToMapCache.invalidate(uuid))); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } + } + - public static String getTableId(Instance instance, String tableName) throws TableNotFoundException { - try { - return _getTableId(instance, tableName); - } catch (NamespaceNotFoundException e) { - throw new TableNotFoundException(tableName, e); - } - } - - public static String _getTableId(Instance instance, String tableName) throws NamespaceNotFoundException, TableNotFoundException { - String tableId = getNameToIdMap(instance).get(tableName); ++ /** + * Lookup table ID in ZK. If not found, clears cache and tries again. + */ + public static Table.ID _getTableId(Instance instance, String tableName) throws NamespaceNotFoundException, TableNotFoundException { - Table.ID tableId = lookupTableId(instance, tableName); ++ Table.ID tableId = getNameToIdMap(instance).get(tableName); if (tableId == null) { // maybe the table exist, but the cache was not updated yet... so try to clear the cache and check again clearCache(instance); @@@ -84,12 -106,44 +101,39 @@@ return tableId; } - public static String getTableName(Instance instance, String tableId) throws TableNotFoundException { ++ public static String getTableName(Instance instance, Table.ID tableId) throws TableNotFoundException { + String tableName = getIdToNameMap(instance).get(tableId); + if (tableName == null) - throw new TableNotFoundException(tableId, null, null); ++ throw new TableNotFoundException(tableId.canonicalID(), null, null); + return tableName; + } + - public static Map<String,String> getNameToIdMap(Instance instance) { ++ public static Map<String,Table.ID> getNameToIdMap(Instance instance) { + return getTableMap(instance).getNameToIdMap(); + } + - public static Map<String,String> getIdToNameMap(Instance instance) { ++ public static Map<Table.ID,String> getIdToNameMap(Instance instance) { + return getTableMap(instance).getIdtoNameMap(); + } + + /** + * Get the TableMap from the cache. A new one will be populated when needed. Cache is cleared manually by calling {@link #clearCache(Instance)} or + * automatically cleared by ZooCache watcher created in {@link #getZooCache(Instance)}. See ACCUMULO-4778. + */ + private static TableMap getTableMap(final Instance instance) { + TableMap map; + try { - map = instanceToMapCache.get(instance.getInstanceID(), new Callable<TableMap>() { - @Override - public TableMap call() { - return new TableMap(instance, getZooCache(instance)); - } - }); ++ map = instanceToMapCache.get(instance.getInstanceID(), () -> new TableMap(instance, getZooCache(instance))); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } + return map; + } + - public static boolean exists(Instance instance, String tableId) { + public static boolean exists(Instance instance, Table.ID tableId) { - if (tableId == null) - return false; ZooCache zc = getZooCache(instance); List<String> tableIds = zc.getChildren(ZooUtil.getRoot(instance) + Constants.ZTABLES); - return tableIds.contains(tableId); + return tableIds.contains(tableId.canonicalID()); } public static void clearCache(Instance instance) { @@@ -107,20 -161,17 +151,12 @@@ * A zookeeper path */ public static void clearCacheByPath(Instance instance, final String zooPath) { - - String thePath; - - if (zooPath.startsWith("/")) { - thePath = zooPath; - } else { - thePath = "/" + zooPath; - } - + String thePath = zooPath.startsWith("/") ? zooPath : "/" + zooPath; getZooCache(instance).clear(ZooUtil.getRoot(instance) + thePath); - + instanceToMapCache.invalidate(instance.getInstanceID()); } - public static String getPrintableTableNameFromId(Map<String,String> tidToNameMap, String tableId) { - String tableName = tidToNameMap.get(tableId); - return tableName == null ? "(ID:" + tableId + ")" : tableName; - } - - public static String getPrintableTableInfoFromId(Instance instance, String tableId) { + public static String getPrintableTableInfoFromId(Instance instance, Table.ID tableId) { String tableName = null; try { tableName = getTableName(instance, tableId); @@@ -173,12 -224,8 +209,8 @@@ } - public static long getCacheResetCount() { - return cacheResetCount.get(); - } - public static String qualified(String tableName) { - return qualified(tableName, Namespaces.DEFAULT_NAMESPACE); + return qualified(tableName, Namespace.DEFAULT); } public static String qualified(String tableName, String defaultNamespace) { @@@ -225,85 -272,10 +257,9 @@@ // We might get null out of ZooCache if this tableID doesn't exist if (null == n) { - throw new TableNotFoundException(tableId, null, null); + throw new TableNotFoundException(tableId.canonicalID(), null, null); } - return new String(n, UTF_8); + return Namespace.ID.of(new String(n, UTF_8)); } - - /** - * Get all table Ids and table names from ZK. The biConsumer accepts the first arg (t) as the table ID and second arg (u) as the table name. - */ - private static void getAllTables(Instance instance, BiConsumer<String,String> biConsumer) { - ZooCache zc = getZooCache(instance); - List<String> tableIds = zc.getChildren(ZooUtil.getRoot(instance) + Constants.ZTABLES); - Map<Namespace.ID,String> namespaceIdToNameMap = new HashMap<>(); - - for (String tableId : tableIds) { - byte[] tableName = zc.get(ZooUtil.getRoot(instance) + Constants.ZTABLES + "/" + tableId + Constants.ZTABLE_NAME); - byte[] nId = zc.get(ZooUtil.getRoot(instance) + Constants.ZTABLES + "/" + tableId + Constants.ZTABLE_NAMESPACE); - String namespaceName = Namespace.DEFAULT; - // create fully qualified table name - if (nId == null) { - namespaceName = null; - } else { - Namespace.ID namespaceId = Namespace.ID.of(new String(nId, UTF_8)); - if (!namespaceId.equals(Namespace.ID.DEFAULT)) { - try { - namespaceName = namespaceIdToNameMap.get(namespaceId); - if (namespaceName == null) { - namespaceName = Namespaces.getNamespaceName(instance, namespaceId); - namespaceIdToNameMap.put(namespaceId, namespaceName); - } - } catch (NamespaceNotFoundException e) { - log.error("Table (" + tableId + ") contains reference to namespace (" + namespaceId + ") that doesn't exist", e); - continue; - } - } - } - if (tableName != null && namespaceName != null) { - String tableNameStr = qualified(new String(tableName, UTF_8), namespaceName); - biConsumer.accept(tableId, tableNameStr); - } - } - } - - public static SortedMap<Table.ID,String> getIdToNameMap(Instance instance) { - SortedMap<Table.ID,String> map = new TreeMap<>(); - getAllTables(instance, (id, name) -> map.put(Table.ID.of(id), name)); - return map; - } - - public static SortedMap<String,Table.ID> getNameToIdMap(Instance instance) { - SortedMap<String,Table.ID> map = new TreeMap<>(); - getAllTables(instance, (id, name) -> map.put(name, Table.ID.of(id))); - return map; - } - - /** - * Lookup the table name in ZK. Fail quietly, returning null if not found. - */ - public static Table.ID lookupTableId(Instance instance, String tableName) { - ArrayList<Table.ID> singleId = new ArrayList<>(1); - getAllTables(instance, (id, name) -> { - if (name.equals(tableName)) - singleId.add(Table.ID.of(id)); - }); - if (singleId.isEmpty()) - return null; - else - return singleId.get(0); - } - - public static String getTableName(Instance instance, Table.ID tableId) throws TableNotFoundException { - ArrayList<String> singleName = new ArrayList<>(1); - getAllTables(instance, (id, name) -> { - if (id.equals(tableId.canonicalID())) - singleName.add(name); - }); - if (singleName.isEmpty()) - throw new TableNotFoundException(tableId.canonicalID(), null, null); -- - return singleName.get(0); - } } diff --cc server/base/src/main/java/org/apache/accumulo/server/util/VerifyTabletAssignments.java index cdf7e3c,198ab95..3ee943b --- a/server/base/src/main/java/org/apache/accumulo/server/util/VerifyTabletAssignments.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/VerifyTabletAssignments.java @@@ -90,7 -89,7 +90,7 @@@ public class VerifyTabletAssignments TreeMap<KeyExtent,String> tabletLocations = new TreeMap<>(); - Table.ID tableId = Tables.lookupTableId(context.getInstance(), tableName); - String tableId = Tables.getNameToIdMap(context.getInstance()).get(tableName); ++ Table.ID tableId = Tables.getNameToIdMap(context.getInstance()).get(tableName); MetadataServicer.forTableId(context, tableId).getTabletLocations(tabletLocations); final HashSet<KeyExtent> failures = new HashSet<>(); diff --cc server/master/src/main/java/org/apache/accumulo/master/tableOps/Utils.java index 96e954e,f58c3ee..b1e2228 --- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/Utils.java +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/Utils.java @@@ -49,10 -46,10 +49,10 @@@ public class Utils private static final byte[] ZERO_BYTE = new byte[] {'0'}; private static final Logger log = LoggerFactory.getLogger(Utils.class); - static void checkTableDoesNotExist(Instance instance, String tableName, String tableId, TableOperation operation) + static void checkTableDoesNotExist(Instance instance, String tableName, Table.ID tableId, TableOperation operation) throws AcceptableThriftTableOperationException { - Table.ID id = Tables.lookupTableId(instance, tableName); - String id = Tables.getNameToIdMap(instance).get(tableName); ++ Table.ID id = Tables.getNameToIdMap(instance).get(tableName); if (id != null && !id.equals(tableId)) throw new AcceptableThriftTableOperationException(null, tableName, operation, TableOperationExceptionType.EXISTS, null); -- To stop receiving notification emails like this one, please contact mmil...@apache.org.