This is an automated email from the ASF dual-hosted git repository. kturner pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit afed8a98d6d7fa69127540165928e18234a584a7 Merge: 1325e81 0bd076e Author: Keith Turner <ktur...@apache.org> AuthorDate: Wed Feb 14 18:21:46 2018 -0500 Merge branch '1.8' .../accumulo/core/client/impl/ClientContext.java | 70 +++++-- .../accumulo/core/client/impl/ThriftScanner.java | 6 +- .../core/client/impl/ThriftTransportKey.java | 4 + .../core/client/impl/ThriftTransportPool.java | 141 +++++++------- .../org/apache/accumulo/core/data/Mutation.java | 4 +- .../apache/accumulo/fate/zookeeper/ZooCache.java | 77 +++++--- .../apache/accumulo/fate/zookeeper/ZooLock.java | 7 +- .../accumulo/fate/zookeeper/ZooCacheTest.java | 12 +- .../accumulo/server/master/LiveTServerSet.java | 4 +- .../org/apache/accumulo/server/util/AdminTest.java | 6 +- .../monitor/util/AccumuloMonitorAppender.java | 4 +- .../apache/accumulo/tserver/session/Session.java | 7 +- .../accumulo/tserver/session/SessionManager.java | 215 +++++++++++++-------- 13 files changed, 344 insertions(+), 213 deletions(-) diff --cc core/src/main/java/org/apache/accumulo/core/client/impl/ClientContext.java index 356fa02,f54a7a9..1fec4a3 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/ClientContext.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ClientContext.java @@@ -22,7 -22,7 +22,8 @@@ import static java.util.Objects.require import java.io.IOException; import java.util.Iterator; import java.util.Map; + import java.util.concurrent.TimeUnit; +import java.util.function.Predicate; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; @@@ -41,6 -41,11 +42,9 @@@ import org.apache.accumulo.core.securit import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Predicate; + import com.google.common.base.Supplier; + import com.google.common.base.Suppliers; + /** * This class represents any essential configuration and credentials needed to initiate RPC operations throughout the code. It is intended to represent a shared * object that contains these things from when the client was first constructed. It is not public API, and is only an internal representation of the context in @@@ -58,14 -62,18 +62,23 @@@ public class ClientContext private final AccumuloConfiguration rpcConf; protected Connector conn; + // These fields are very frequently accessed (each time a connection is created) and expensive to compute, so cache them. + private Supplier<Long> timeoutSupplier; + private Supplier<SaslConnectionParams> saslSupplier; + private Supplier<SslConnectionParams> sslSupplier; + private TCredentials rpcCreds; + + /** + * Instantiate a client context + */ public ClientContext(Instance instance, Credentials credentials, ClientConfiguration clientConf) { + this(instance, credentials, clientConf, new BatchWriterConfig()); + } + + public ClientContext(Instance instance, Credentials credentials, ClientConfiguration clientConf, BatchWriterConfig batchWriterConfig) { this(instance, credentials, convertClientConfig(requireNonNull(clientConf, "clientConf is null"))); this.clientConf = clientConf; + this.batchWriterConfig = batchWriterConfig; } /** @@@ -76,6 -84,43 +89,33 @@@ creds = requireNonNull(credentials, "credentials is null"); rpcConf = requireNonNull(serverConf, "serverConf is null"); clientConf = null; + - timeoutSupplier = new Supplier<Long>() { - @Override - public Long get() { - return getConfiguration().getTimeInMillis(Property.GENERAL_RPC_TIMEOUT); - } - }; ++ timeoutSupplier = () -> getConfiguration().getTimeInMillis(Property.GENERAL_RPC_TIMEOUT); + - sslSupplier = new Supplier<SslConnectionParams>() { - @Override - public SslConnectionParams get() { - return SslConnectionParams.forClient(getConfiguration()); - } - }; ++ sslSupplier = () -> SslConnectionParams.forClient(getConfiguration()); + + saslSupplier = new Supplier<SaslConnectionParams>() { + @Override + public SaslConnectionParams get() { + // Use the clientConf if we have it + if (null != clientConf) { + if (!clientConf.hasSasl()) { + return null; + } + return new SaslConnectionParams(clientConf, getCredentials().getToken()); + } + AccumuloConfiguration conf = getConfiguration(); + if (!conf.getBoolean(Property.INSTANCE_RPC_SASL_ENABLED)) { + return null; + } + return new SaslConnectionParams(conf, getCredentials().getToken()); + } + }; - ++ + timeoutSupplier = Suppliers.memoizeWithExpiration(timeoutSupplier, 100, TimeUnit.MILLISECONDS); + sslSupplier = Suppliers.memoizeWithExpiration(sslSupplier, 100, TimeUnit.MILLISECONDS); + saslSupplier = Suppliers.memoizeWithExpiration(saslSupplier, 100, TimeUnit.MILLISECONDS); + } /** diff --cc core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java index 3472476,b7b1c67..d6637e4 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftScanner.java @@@ -414,11 -414,12 +414,11 @@@ public class ThriftScanner scanState.prevLoc = loc; if (scanState.scanID == null) { - String msg = "Starting scan tserver=" + loc.tablet_location + " tablet=" + loc.tablet_extent + " range=" + scanState.range + " ssil=" - + scanState.serverSideIteratorList + " ssio=" + scanState.serverSideIteratorOptions + " context=" + scanState.classLoaderContext; - Thread.currentThread().setName(msg); - + Thread.currentThread().setName("Starting scan tserver=" + loc.tablet_location + " tableId=" + loc.tablet_extent.getTableId()); if (log.isTraceEnabled()) { + String msg = "Starting scan tserver=" + loc.tablet_location + " tablet=" + loc.tablet_extent + " range=" + scanState.range + " ssil=" - + scanState.serverSideIteratorList + " ssio=" + scanState.serverSideIteratorOptions; ++ + scanState.serverSideIteratorList + " ssio=" + scanState.serverSideIteratorOptions + " context=" + scanState.classLoaderContext; log.trace("tid={} {}", Thread.currentThread().getId(), msg); timer = new OpTimer().start(); } diff --cc core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java index 9d07af2,c221607..93c3432 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ThriftTransportPool.java @@@ -40,8 -41,11 +40,10 @@@ import org.slf4j.Logger import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; + import com.google.common.base.Preconditions; + import com.google.common.collect.Iterables; public class ThriftTransportPool { - private static SecurityPermission TRANSPORT_POOL_PERMISSION = new SecurityPermission("transportPoolPermission"); private static final Random random = new Random(); private long killTime = 1000 * 3; @@@ -426,13 -449,11 +447,11 @@@ Collections.shuffle(cachedServers, random); for (ThriftTransportKey ttk : cachedServers) { - for (CachedConnection cachedConnection : getCache().get(ttk)) { - if (!cachedConnection.isReserved()) { - cachedConnection.setReserved(true); - final String serverAddr = ttk.getServer().toString(); - log.trace("Using existing connection to {}", serverAddr); - return new Pair<>(serverAddr, cachedConnection.transport); - } + CachedConnection cachedConnection = getCache().get(ttk).reserveAny(); + if (cachedConnection != null) { + final String serverAddr = ttk.getServer().toString(); + log.trace("Using existing connection to {}", serverAddr); - return new Pair<String,TTransport>(serverAddr, cachedConnection.transport); ++ return new Pair<>(serverAddr, cachedConnection.transport); } } } @@@ -446,15 -467,12 +465,12 @@@ if (preferCachedConnection) { synchronized (this) { - List<CachedConnection> cachedConnList = getCache().get(ttk); - if (cachedConnList != null) { - for (CachedConnection cachedConnection : cachedConnList) { - if (!cachedConnection.isReserved()) { - cachedConnection.setReserved(true); - final String serverAddr = ttk.getServer().toString(); - log.trace("Using existing connection to {} timeout {}", serverAddr, ttk.getTimeout()); - return new Pair<>(serverAddr, cachedConnection.transport); - } + CachedConnections cachedConns = getCache().get(ttk); + if (cachedConns != null) { + CachedConnection cachedConnection = cachedConns.reserveAny(); + if (cachedConnection != null) { + final String serverAddr = ttk.getServer().toString(); - return new Pair<String,TTransport>(serverAddr, cachedConnection.transport); ++ return new Pair<>(serverAddr, cachedConnection.transport); } } } diff --cc core/src/main/java/org/apache/accumulo/core/data/Mutation.java index 5e1a7ba,ebc72f5..6ebdcb0 --- a/core/src/main/java/org/apache/accumulo/core/data/Mutation.java +++ b/core/src/main/java/org/apache/accumulo/core/data/Mutation.java @@@ -321,9 -306,6 +321,9 @@@ public class Mutation implements Writab if (buffer == null) { throw new IllegalStateException("Can not add to mutation after serializing it"); } - long estimatedSizeAfterPut = estRowAndLargeValSize + buffer.size() + cfLength + cqLength + cv.length + (hasts ? 8 : 0) + valLength + 2 - + 4 * SERIALIZATION_OVERHEAD; ++ long estimatedSizeAfterPut = estRowAndLargeValSize + buffer.size() + cfLength + cqLength + cv.length + (hasts ? 8 : 0) + valLength + 2 + 4 ++ * SERIALIZATION_OVERHEAD; + Preconditions.checkArgument(estimatedSizeAfterPut < MAX_MUTATION_SIZE && estimatedSizeAfterPut >= 0, "Maximum mutation size must be less than 2GB "); put(cf, cfLength); put(cq, cqLength); put(cv); diff --cc fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java index 1d8a64e,2ef938b..afe7d37 --- a/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java +++ b/fate/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java @@@ -65,9 -60,34 +60,39 @@@ public class ZooCache private final ZooReader zReader; + public static class ZcStat { + private long ephemeralOwner; ++ private long mzxid; + - public ZcStat() { - - } ++ public ZcStat() {} + + private ZcStat(Stat stat) { + this.ephemeralOwner = stat.getEphemeralOwner(); ++ this.mzxid = stat.getMzxid(); + } + + public long getEphemeralOwner() { + return ephemeralOwner; + } + + private void set(ZcStat cachedStat) { + this.ephemeralOwner = cachedStat.ephemeralOwner; ++ this.mzxid = cachedStat.mzxid; + } + + @VisibleForTesting + public void setEphemeralOwner(long ephemeralOwner) { + this.ephemeralOwner = ephemeralOwner; + } ++ ++ public long getMzxid() { ++ return mzxid; ++ } + } + private static class ImmutableCacheCopies { final Map<String,byte[]> cache; - final Map<String,Stat> statCache; + final Map<String,ZcStat> statCache; final Map<String,List<String>> childrenCache; final long updateCount; @@@ -369,11 -390,11 +395,11 @@@ throw new ConcurrentModificationException(); } if (log.isTraceEnabled()) { - log.trace("zookeeper contained " + zPath + " " + (data == null ? null : new String(data, UTF_8))); + log.trace("zookeeper contained {} {}", zPath, (data == null ? null : new String(data, UTF_8))); } } - put(zPath, data, stat); - copyStats(status, stat); + put(zPath, data, zstat); + copyStats(status, zstat); return data; } finally { cacheWriteLock.unlock(); diff --cc server/monitor/src/main/java/org/apache/accumulo/monitor/util/AccumuloMonitorAppender.java index 5a24e4f,0000000..07c886b mode 100644,000000..100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/util/AccumuloMonitorAppender.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/util/AccumuloMonitorAppender.java @@@ -1,221 -1,0 +1,221 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.monitor.util; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import java.util.Objects; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; +import java.util.function.Supplier; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.Instance; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.util.HostAndPort; +import org.apache.accumulo.core.zookeeper.ZooUtil; +import org.apache.accumulo.fate.zookeeper.ZooCache; ++import org.apache.accumulo.fate.zookeeper.ZooCache.ZcStat; +import org.apache.accumulo.fate.zookeeper.ZooCacheFactory; +import org.apache.accumulo.server.client.HdfsZooInstance; +import org.apache.log4j.AppenderSkeleton; +import org.apache.log4j.AsyncAppender; +import org.apache.log4j.net.SocketAppender; - import org.apache.zookeeper.data.Stat; + +public class AccumuloMonitorAppender extends AsyncAppender implements AutoCloseable { + + final ScheduledExecutorService executorService; + final AtomicBoolean trackerScheduled; + private int frequency = 0; + private MonitorTracker tracker = null; + + /** + * A Log4j Appender which follows the registered location of the active Accumulo monitor service, and forwards log messages to it + */ + public AccumuloMonitorAppender() { + // create the background thread to watch for updates to monitor location + trackerScheduled = new AtomicBoolean(false); + executorService = Executors.newSingleThreadScheduledExecutor(runnable -> { + Thread t = new Thread(runnable, AccumuloMonitorAppender.class.getSimpleName() + " Location Tracker"); + t.setDaemon(true); + return t; + }); + } + + public void setFrequency(int millis) { + if (millis > 0) { + frequency = millis; + } + } + + public int getFrequency() { + return frequency; + } + + // this is just for testing + void setTracker(MonitorTracker monitorTracker) { + tracker = monitorTracker; + } + + @Override + public void activateOptions() { + // only schedule it once (in case options get activated more than once); not sure if this is possible + if (trackerScheduled.compareAndSet(false, true)) { + if (frequency <= 0) { + // use default rate of 5 seconds between each check + frequency = 5000; + } + if (tracker == null) { + tracker = new MonitorTracker(this, new ZooCacheLocationSupplier(), new SocketAppenderFactory()); + } + executorService.scheduleWithFixedDelay(tracker, frequency, frequency, TimeUnit.MILLISECONDS); + } + super.activateOptions(); + } + + @Override + public void close() { + if (!executorService.isShutdown()) { + executorService.shutdownNow(); + } + super.close(); + } + + static class MonitorLocation { + private final String location; + private final long modId; + + public MonitorLocation(long modId, byte[] location) { + this.modId = modId; + this.location = location == null ? null : new String(location, UTF_8); + } + + public boolean hasLocation() { + return location != null; + } + + public String getLocation() { + return location; + } + + @Override + public boolean equals(Object obj) { + if (obj != null && obj instanceof MonitorLocation) { + MonitorLocation other = (MonitorLocation) obj; + return modId == other.modId && Objects.equals(location, other.location); + } + return false; + } + + @Override + public int hashCode() { + return Long.hashCode(modId); + } + } + + private static class ZooCacheLocationSupplier implements Supplier<MonitorLocation> { + + // path and zooCache are lazily set the first time this tracker is run + // this allows the tracker to be constructed and scheduled during log4j initialization without + // triggering any actual logs from the Accumulo or ZooKeeper code + private String path = null; + private ZooCache zooCache = null; + + @Override + public MonitorLocation get() { + // lazily set up path and zooCache (see comment in constructor) + if (this.zooCache == null) { + Instance instance = HdfsZooInstance.getInstance(); + this.path = ZooUtil.getRoot(instance) + Constants.ZMONITOR_LOG4J_ADDR; + this.zooCache = new ZooCacheFactory().getZooCache(instance.getZooKeepers(), instance.getZooKeepersSessionTimeOut()); + } + + // get the current location from the cache and update if necessary - Stat stat = new Stat(); ++ ZcStat stat = new ZcStat(); + byte[] loc = zooCache.get(path, stat); + // mzxid is 0 if location does not exist and the non-zero transaction id of the last modification otherwise + return new MonitorLocation(stat.getMzxid(), loc); + } + } + + private static class SocketAppenderFactory implements Function<MonitorLocation,AppenderSkeleton> { + @Override + public AppenderSkeleton apply(MonitorLocation loc) { + int defaultPort = Integer.parseUnsignedInt(Property.MONITOR_LOG4J_PORT.getDefaultValue()); + HostAndPort remote = HostAndPort.fromString(loc.getLocation()); + + SocketAppender socketAppender = new SocketAppender(); + socketAppender.setApplication(System.getProperty("accumulo.application", "unknown")); + socketAppender.setRemoteHost(remote.getHost()); + socketAppender.setPort(remote.getPortOrDefault(defaultPort)); + + return socketAppender; + } + } + + static class MonitorTracker implements Runnable { + + private final AccumuloMonitorAppender parentAsyncAppender; + private final Supplier<MonitorLocation> currentLocationSupplier; + private final Function<MonitorLocation,AppenderSkeleton> appenderFactory; + + private MonitorLocation lastLocation; + private AppenderSkeleton lastSocketAppender; + + public MonitorTracker(AccumuloMonitorAppender appender, Supplier<MonitorLocation> currentLocationSupplier, + Function<MonitorLocation,AppenderSkeleton> appenderFactory) { + this.parentAsyncAppender = Objects.requireNonNull(appender); + this.appenderFactory = Objects.requireNonNull(appenderFactory); + this.currentLocationSupplier = Objects.requireNonNull(currentLocationSupplier); + + this.lastLocation = new MonitorLocation(0, null); + this.lastSocketAppender = null; + } + + @Override + public void run() { + try { + MonitorLocation currentLocation = currentLocationSupplier.get(); + // detect change + if (!currentLocation.equals(lastLocation)) { + // clean up old appender + if (lastSocketAppender != null) { + parentAsyncAppender.removeAppender(lastSocketAppender); + lastSocketAppender.close(); + lastSocketAppender = null; + } + // create a new one + if (currentLocation.hasLocation()) { + lastSocketAppender = appenderFactory.apply(currentLocation); + lastSocketAppender.activateOptions(); + parentAsyncAppender.addAppender(lastSocketAppender); + } + // update the last location only if switching was successful + lastLocation = currentLocation; + } + } catch (Exception e) { + // dump any non-fatal problems to the console, but let it run again + e.printStackTrace(); + } + } + + } + +} diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java index 5f0a962,e0fb795..06bd6a5 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java @@@ -26,8 -27,9 +27,10 @@@ import java.util.Map import java.util.Map.Entry; import java.util.Set; import java.util.TimerTask; + import java.util.concurrent.ConcurrentHashMap; + import java.util.concurrent.ConcurrentMap; +import org.apache.accumulo.core.client.impl.Table; import org.apache.accumulo.core.client.impl.Translator; import org.apache.accumulo.core.client.impl.Translators; import org.apache.accumulo.core.conf.AccumuloConfiguration; @@@ -171,19 -218,23 +219,22 @@@ public class SessionManager private void sweep(final long maxIdle, final long maxUpdateIdle) { List<Session> sessionsToCleanup = new ArrayList<>(); - synchronized (this) { - Iterator<Session> iter = sessions.values().iterator(); - while (iter.hasNext()) { - Session session = iter.next(); - long configuredIdle = maxIdle; - if (session instanceof UpdateSession) { - configuredIdle = maxUpdateIdle; - } - long idleTime = System.currentTimeMillis() - session.lastAccessTime; - if (idleTime > configuredIdle && !session.reserved) { - log.info("Closing idle session from user={}, client={}, idle={}ms", session.getUser(), session.client, idleTime); - iter.remove(); - sessionsToCleanup.add(session); - + Iterator<Session> iter = sessions.values().iterator(); + while (iter.hasNext()) { + Session session = iter.next(); + synchronized (session) { + if (session.state == State.UNRESERVED) { + long configuredIdle = maxIdle; + if (session instanceof UpdateSession) { + configuredIdle = maxUpdateIdle; + } + long idleTime = System.currentTimeMillis() - session.lastAccessTime; + if (idleTime > configuredIdle) { - log.info("Closing idle session from user=" + session.getUser() + ", client=" + session.client + ", idle=" + idleTime + "ms"); ++ log.info("Closing idle session from user={}, client={}, idle={}ms", session.getUser(), session.client, idleTime); + iter.remove(); + sessionsToCleanup.add(session); + session.state = State.REMOVED; + } } } } @@@ -231,8 -289,8 +289,9 @@@ } } - public synchronized Map<Table.ID,MapCounter<ScanRunState>> getActiveScansPerTable() { - public Map<String,MapCounter<ScanRunState>> getActiveScansPerTable() { - Map<String,MapCounter<ScanRunState>> counts = new HashMap<>(); ++ public Map<Table.ID,MapCounter<ScanRunState>> getActiveScansPerTable() { + Map<Table.ID,MapCounter<ScanRunState>> counts = new HashMap<>(); ++ Set<Entry<Long,Session>> copiedIdleSessions = new HashSet<>(); synchronized (idleSessions) { -- To stop receiving notification emails like this one, please contact ktur...@apache.org.