Merge branch '1.6' into 1.7 Conflicts: core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java core/src/main/java/org/apache/accumulo/core/iterators/IteratorUtil.java server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/d4882a15 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/d4882a15 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/d4882a15 Branch: refs/heads/master Commit: d4882a15fd85482855783e3a56babc14d31bb5bb Parents: 0faf8b9 21d2f61 Author: Keith Turner <ktur...@apache.org> Authored: Wed Jan 27 19:57:22 2016 -0500 Committer: Keith Turner <ktur...@apache.org> Committed: Wed Jan 27 19:57:22 2016 -0500 ---------------------------------------------------------------------- .../core/client/impl/CompressedIterators.java | 14 +- .../core/client/impl/ConditionalWriterImpl.java | 44 ++++- .../accumulo/core/iterators/IteratorUtil.java | 93 ++++++----- .../client/impl/ConditionalComparatorTest.java | 53 ++++++ .../tserver/ConditionCheckerContext.java | 164 +++++++++++++++++++ .../apache/accumulo/tserver/TabletServer.java | 76 ++------- .../accumulo/tserver/tablet/ScanDataSource.java | 23 ++- .../apache/accumulo/tserver/tablet/Tablet.java | 18 ++ .../accumulo/test/ConditionalWriterIT.java | 137 +++++++++++++++- 9 files changed, 507 insertions(+), 115 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/d4882a15/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java ---------------------------------------------------------------------- diff --cc core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java index 24040e6,9030d77..c7756ad --- a/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java @@@ -17,11 -17,11 +17,13 @@@ package org.apache.accumulo.core.client.impl; +import static java.nio.charset.StandardCharsets.UTF_8; + import java.nio.ByteBuffer; import java.util.ArrayList; + import java.util.Arrays; import java.util.Collections; + import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@@ -69,14 -69,13 +71,15 @@@ import org.apache.accumulo.core.securit import org.apache.accumulo.core.security.VisibilityParseException; import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException; import org.apache.accumulo.core.tabletserver.thrift.TabletClientService; +import org.apache.accumulo.core.trace.Trace; +import org.apache.accumulo.core.trace.Tracer; +import org.apache.accumulo.core.trace.thrift.TInfo; import org.apache.accumulo.core.util.BadArgumentException; import org.apache.accumulo.core.util.ByteBufferUtil; -import org.apache.accumulo.core.util.LoggingRunnable; + import org.apache.accumulo.core.util.NamingThreadFactory; -import org.apache.accumulo.core.util.ThriftUtil; import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.core.zookeeper.ZooUtil; +import org.apache.accumulo.fate.util.LoggingRunnable; import org.apache.accumulo.fate.zookeeper.ZooCacheFactory; import org.apache.accumulo.fate.zookeeper.ZooLock; import org.apache.accumulo.fate.zookeeper.ZooUtil.LockID; @@@ -379,12 -373,13 +382,12 @@@ class ConditionalWriterImpl implements } } - ConditionalWriterImpl(Instance instance, Credentials credentials, String tableId, ConditionalWriterConfig config) { - this.instance = instance; - this.credentials = credentials; + ConditionalWriterImpl(ClientContext context, String tableId, ConditionalWriterConfig config) { + this.context = context; this.auths = config.getAuthorizations(); this.ve = new VisibilityEvaluator(config.getAuthorizations()); - this.threadPool = new ScheduledThreadPoolExecutor(config.getMaxWriteThreads()); + this.threadPool = new ScheduledThreadPoolExecutor(config.getMaxWriteThreads(), new NamingThreadFactory(this.getClass().getSimpleName())); - this.locator = TabletLocator.getLocator(instance, new Text(tableId)); + this.locator = TabletLocator.getLocator(context, new Text(tableId)); this.serverQueues = new HashMap<String,ServerQueue>(); this.tableId = tableId; this.timeout = config.getTimeout(TimeUnit.MILLISECONDS); http://git-wip-us.apache.org/repos/asf/accumulo/blob/d4882a15/core/src/main/java/org/apache/accumulo/core/iterators/IteratorUtil.java ---------------------------------------------------------------------- diff --cc core/src/main/java/org/apache/accumulo/core/iterators/IteratorUtil.java index 031d13f,6f76d77..c739e56 --- a/core/src/main/java/org/apache/accumulo/core/iterators/IteratorUtil.java +++ b/core/src/main/java/org/apache/accumulo/core/iterators/IteratorUtil.java @@@ -124,22 -109,27 +125,27 @@@ public class IteratorUtil return props; } - public static int getMaxPriority(IteratorScope scope, AccumuloConfiguration conf) { - List<IterInfo> iters = new ArrayList<IterInfo>(); - parseIterConf(scope, iters, new HashMap<String,Map<String,String>>(), conf); + public static void mergeIteratorConfig(List<IterInfo> destList, Map<String,Map<String,String>> destOpts, List<IterInfo> tableIters, + Map<String,Map<String,String>> tableOpts, List<IterInfo> ssi, Map<String,Map<String,String>> ssio) { + destList.addAll(tableIters); + destList.addAll(ssi); + Collections.sort(destList, new IterInfoComparator()); - int max = 0; - - for (IterInfo iterInfo : iters) { - if (iterInfo.priority > max) - max = iterInfo.priority; + Set<Entry<String,Map<String,String>>> es = tableOpts.entrySet(); + for (Entry<String,Map<String,String>> entry : es) { + if (entry.getValue() == null) { + destOpts.put(entry.getKey(), null); + } else { - destOpts.put(entry.getKey(), new HashMap<String,String>(entry.getValue())); ++ destOpts.put(entry.getKey(), new HashMap<>(entry.getValue())); + } } - return max; + IteratorUtil.mergeOptions(ssio, destOpts); + } - protected static void parseIterConf(IteratorScope scope, List<IterInfo> iters, Map<String,Map<String,String>> allOptions, AccumuloConfiguration conf) { + public static void parseIterConf(IteratorScope scope, List<IterInfo> iters, Map<String,Map<String,String>> allOptions, AccumuloConfiguration conf) { - final Property scopeProperty = IteratorScope.getProperty(scope); + final Property scopeProperty = getProperty(scope); final String scopePropertyKey = scopeProperty.getKey(); for (Entry<String,String> entry : conf.getAllPropertiesWithPrefix(scopeProperty).entrySet()) { http://git-wip-us.apache.org/repos/asf/accumulo/blob/d4882a15/server/tserver/src/main/java/org/apache/accumulo/tserver/ConditionCheckerContext.java ---------------------------------------------------------------------- diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/ConditionCheckerContext.java index 0000000,2e34f38..39aa684 mode 000000,100644..100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ConditionCheckerContext.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ConditionCheckerContext.java @@@ -1,0 -1,164 +1,164 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.accumulo.tserver; + + import java.io.IOException; + import java.util.ArrayList; + import java.util.Arrays; + import java.util.Collections; + import java.util.HashMap; + import java.util.List; + import java.util.Map; + + import org.apache.accumulo.core.client.impl.CompressedIterators; + import org.apache.accumulo.core.client.impl.CompressedIterators.IterConfig; + import org.apache.accumulo.core.conf.AccumuloConfiguration; + import org.apache.accumulo.core.conf.Property; + import org.apache.accumulo.core.data.ArrayByteSequence; + import org.apache.accumulo.core.data.ByteSequence; + import org.apache.accumulo.core.data.Key; + import org.apache.accumulo.core.data.Range; + import org.apache.accumulo.core.data.Value; + import org.apache.accumulo.core.data.thrift.IterInfo; + import org.apache.accumulo.core.data.thrift.TCMResult; + import org.apache.accumulo.core.data.thrift.TCMStatus; + import org.apache.accumulo.core.data.thrift.TCondition; + import org.apache.accumulo.core.iterators.IteratorUtil; + import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; + import org.apache.accumulo.core.iterators.SortedKeyValueIterator; + import org.apache.accumulo.tserver.data.ServerConditionalMutation; + import org.apache.hadoop.io.Text; + + import com.google.common.base.Preconditions; + + public class ConditionCheckerContext { + private CompressedIterators compressedIters; + + private List<IterInfo> tableIters; + private Map<String,Map<String,String>> tableIterOpts; + private TabletIteratorEnvironment tie; + private String context; + private Map<String,Class<? extends SortedKeyValueIterator<Key,Value>>> classCache; + + private static class MergedIterConfig { + List<IterInfo> mergedIters; + Map<String,Map<String,String>> mergedItersOpts; + + MergedIterConfig(List<IterInfo> mergedIters, Map<String,Map<String,String>> mergedItersOpts) { + this.mergedIters = mergedIters; + this.mergedItersOpts = mergedItersOpts; + } + } + - private Map<ByteSequence,MergedIterConfig> mergedIterCache = new HashMap<ByteSequence,MergedIterConfig>(); ++ private Map<ByteSequence,MergedIterConfig> mergedIterCache = new HashMap<>(); + + ConditionCheckerContext(CompressedIterators compressedIters, AccumuloConfiguration tableConf) { + this.compressedIters = compressedIters; + - tableIters = new ArrayList<IterInfo>(); - tableIterOpts = new HashMap<String,Map<String,String>>(); ++ tableIters = new ArrayList<>(); ++ tableIterOpts = new HashMap<>(); + + // parse table iterator config once + IteratorUtil.parseIterConf(IteratorScope.scan, tableIters, tableIterOpts, tableConf); + + context = tableConf.get(Property.TABLE_CLASSPATH); + - classCache = new HashMap<String,Class<? extends SortedKeyValueIterator<Key,Value>>>(); ++ classCache = new HashMap<>(); + + tie = new TabletIteratorEnvironment(IteratorScope.scan, tableConf); + } + + SortedKeyValueIterator<Key,Value> buildIterator(SortedKeyValueIterator<Key,Value> systemIter, TCondition tc) throws IOException { + + ArrayByteSequence key = new ArrayByteSequence(tc.iterators); + MergedIterConfig mic = mergedIterCache.get(key); + if (mic == null) { + IterConfig ic = compressedIters.decompress(tc.iterators); + - List<IterInfo> mergedIters = new ArrayList<IterInfo>(tableIters.size() + ic.ssiList.size()); - Map<String,Map<String,String>> mergedItersOpts = new HashMap<String,Map<String,String>>(tableIterOpts.size() + ic.ssio.size()); ++ List<IterInfo> mergedIters = new ArrayList<>(tableIters.size() + ic.ssiList.size()); ++ Map<String,Map<String,String>> mergedItersOpts = new HashMap<>(tableIterOpts.size() + ic.ssio.size()); + + IteratorUtil.mergeIteratorConfig(mergedIters, mergedItersOpts, tableIters, tableIterOpts, ic.ssiList, ic.ssio); + + mic = new MergedIterConfig(mergedIters, mergedItersOpts); + + mergedIterCache.put(key, mic); + } + + return IteratorUtil.loadIterators(systemIter, mic.mergedIters, mic.mergedItersOpts, tie, true, context, classCache); + } + + boolean checkConditions(SortedKeyValueIterator<Key,Value> systemIter, ServerConditionalMutation scm) throws IOException { + boolean add = true; + + for (TCondition tc : scm.getConditions()) { + + Range range; + if (tc.hasTimestamp) + range = Range.exact(new Text(scm.getRow()), new Text(tc.getCf()), new Text(tc.getCq()), new Text(tc.getCv()), tc.getTs()); + else + range = Range.exact(new Text(scm.getRow()), new Text(tc.getCf()), new Text(tc.getCq()), new Text(tc.getCv())); + + SortedKeyValueIterator<Key,Value> iter = buildIterator(systemIter, tc); + + ByteSequence cf = new ArrayByteSequence(tc.getCf()); + iter.seek(range, Collections.singleton(cf), true); + Value val = null; + if (iter.hasTop()) { + val = iter.getTopValue(); + } + + if ((val == null ^ tc.getVal() == null) || (val != null && !Arrays.equals(tc.getVal(), val.get()))) { + add = false; + break; + } + } + return add; + } + + public class ConditionChecker { + + private List<ServerConditionalMutation> conditionsToCheck; + private List<ServerConditionalMutation> okMutations; + private List<TCMResult> results; + private boolean checked = false; + + public ConditionChecker(List<ServerConditionalMutation> conditionsToCheck, List<ServerConditionalMutation> okMutations, List<TCMResult> results) { + this.conditionsToCheck = conditionsToCheck; + this.okMutations = okMutations; + this.results = results; + } + + public void check(SortedKeyValueIterator<Key,Value> systemIter) throws IOException { + Preconditions.checkArgument(!checked, "check() method should only be called once"); + checked = true; + + for (ServerConditionalMutation scm : conditionsToCheck) { + if (checkConditions(systemIter, scm)) { + okMutations.add(scm); + } else { + results.add(new TCMResult(scm.getID(), TCMStatus.REJECTED)); + } + } + } + } + + public ConditionChecker newChecker(List<ServerConditionalMutation> conditionsToCheck, List<ServerConditionalMutation> okMutations, List<TCMResult> results) { + return new ConditionChecker(conditionsToCheck, okMutations, results); + } + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/d4882a15/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java ---------------------------------------------------------------------- diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index 1e0d119,6023ae3..038d3e8 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@@ -57,11 -65,8 +57,10 @@@ import java.util.concurrent.locks.Reent import org.apache.accumulo.core.Constants; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; +import org.apache.accumulo.core.client.Durability; import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.client.impl.CompressedIterators; - import org.apache.accumulo.core.client.impl.CompressedIterators.IterConfig; +import org.apache.accumulo.core.client.impl.DurabilityImpl; import org.apache.accumulo.core.client.impl.ScannerImpl; import org.apache.accumulo.core.client.impl.Tables; import org.apache.accumulo.core.client.impl.TabletLocator; @@@ -91,9 -98,9 +90,8 @@@ import org.apache.accumulo.core.data.th import org.apache.accumulo.core.data.thrift.TCMResult; import org.apache.accumulo.core.data.thrift.TCMStatus; import org.apache.accumulo.core.data.thrift.TColumn; - import org.apache.accumulo.core.data.thrift.TCondition; import org.apache.accumulo.core.data.thrift.TConditionalMutation; import org.apache.accumulo.core.data.thrift.TConditionalSession; -import org.apache.accumulo.core.data.thrift.TKey; import org.apache.accumulo.core.data.thrift.TKeyExtent; import org.apache.accumulo.core.data.thrift.TKeyValue; import org.apache.accumulo.core.data.thrift.TMutation; @@@ -197,7 -193,21 +195,8 @@@ import org.apache.accumulo.server.zooke import org.apache.accumulo.server.zookeeper.ZooReaderWriter; import org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader; import org.apache.accumulo.start.classloader.vfs.ContextManager; -import org.apache.accumulo.trace.instrument.Span; -import org.apache.accumulo.trace.instrument.Trace; -import org.apache.accumulo.trace.thrift.TInfo; -import org.apache.accumulo.tserver.Compactor.CompactionInfo; + import org.apache.accumulo.tserver.ConditionCheckerContext.ConditionChecker; import org.apache.accumulo.tserver.RowLocks.RowLock; -import org.apache.accumulo.tserver.Tablet.CommitSession; -import org.apache.accumulo.tserver.Tablet.KVEntry; -import org.apache.accumulo.tserver.Tablet.LookupResult; -import org.apache.accumulo.tserver.Tablet.MinorCompactionReason; -import org.apache.accumulo.tserver.Tablet.ScanBatch; -import org.apache.accumulo.tserver.Tablet.Scanner; -import org.apache.accumulo.tserver.Tablet.SplitInfo; -import org.apache.accumulo.tserver.Tablet.TConstraintViolationException; -import org.apache.accumulo.tserver.Tablet.TabletClosedException; import org.apache.accumulo.tserver.TabletServerResourceManager.TabletResourceManager; import org.apache.accumulo.tserver.TabletStatsKeeper.Operation; import org.apache.accumulo.tserver.compaction.MajorCompactionReason; @@@ -209,30 -219,10 +208,28 @@@ import org.apache.accumulo.tserver.log. import org.apache.accumulo.tserver.mastermessage.MasterMessage; import org.apache.accumulo.tserver.mastermessage.SplitReportMessage; import org.apache.accumulo.tserver.mastermessage.TabletStatusMessage; -import org.apache.accumulo.tserver.metrics.TabletServerMBean; -import org.apache.accumulo.tserver.metrics.TabletServerMinCMetrics; +import org.apache.accumulo.tserver.metrics.TabletServerMetricsFactory; import org.apache.accumulo.tserver.metrics.TabletServerScanMetrics; import org.apache.accumulo.tserver.metrics.TabletServerUpdateMetrics; +import org.apache.accumulo.tserver.replication.ReplicationServicerHandler; +import org.apache.accumulo.tserver.replication.ReplicationWorker; +import org.apache.accumulo.tserver.scan.LookupTask; +import org.apache.accumulo.tserver.scan.NextBatchTask; +import org.apache.accumulo.tserver.scan.ScanRunState; +import org.apache.accumulo.tserver.session.ConditionalSession; +import org.apache.accumulo.tserver.session.MultiScanSession; +import org.apache.accumulo.tserver.session.ScanSession; +import org.apache.accumulo.tserver.session.Session; +import org.apache.accumulo.tserver.session.SessionManager; +import org.apache.accumulo.tserver.session.UpdateSession; +import org.apache.accumulo.tserver.tablet.CommitSession; +import org.apache.accumulo.tserver.tablet.CompactionInfo; +import org.apache.accumulo.tserver.tablet.CompactionWatcher; +import org.apache.accumulo.tserver.tablet.Compactor; - import org.apache.accumulo.tserver.tablet.KVEntry; +import org.apache.accumulo.tserver.tablet.ScanBatch; - import org.apache.accumulo.tserver.tablet.Scanner; +import org.apache.accumulo.tserver.tablet.SplitInfo; +import org.apache.accumulo.tserver.tablet.Tablet; +import org.apache.accumulo.tserver.tablet.TabletClosedException; import org.apache.commons.collections.map.LRUMap; import org.apache.hadoop.fs.FSError; import org.apache.hadoop.fs.FileSystem; @@@ -256,151 -256,781 +253,150 @@@ public class TabletServer extends Accum private static final long RECENTLY_SPLIT_MILLIES = 60 * 1000; private static final long TIME_BETWEEN_GC_CHECKS = 5000; private static final long TIME_BETWEEN_LOCATOR_CACHE_CLEARS = 60 * 60 * 1000; - private static final Set<Column> EMPTY_COLUMNS = Collections.emptySet(); - private TabletServerLogger logger; - - protected TabletServerMinCMetrics mincMetrics = new TabletServerMinCMetrics(); + private final GarbageCollectionLogger gcLogger = new GarbageCollectionLogger(); + private final TransactionWatcher watcher = new TransactionWatcher(); + private final ZooCache masterLockCache = new ZooCache(); - private ServerConfiguration serverConfig; - private LogSorter logSorter = null; + private final TabletServerLogger logger; - public TabletServer(ServerConfiguration conf, VolumeManager fs) { - super(); - this.serverConfig = conf; - this.instance = conf.getInstance(); - this.fs = fs; + private final TabletServerMetricsFactory metricsFactory; + private final Metrics updateMetrics; + private final Metrics scanMetrics; + private final Metrics mincMetrics; - log.info("Version " + Constants.VERSION); - log.info("Instance " + instance.getInstanceID()); - - this.logSorter = new LogSorter(instance, fs, getSystemConfiguration()); - SimpleTimer.getInstance().schedule(new Runnable() { - @Override - public void run() { - synchronized (onlineTablets) { - long now = System.currentTimeMillis(); - for (Tablet tablet : onlineTablets.values()) - try { - tablet.updateRates(now); - } catch (Exception ex) { - log.error(ex, ex); - } - } - } - }, TIME_BETWEEN_GC_CHECKS, TIME_BETWEEN_GC_CHECKS); - SimpleTimer.getInstance().schedule(new Runnable() { - @Override - public void run() { - TabletLocator.clearLocators(); - } - }, jitter(TIME_BETWEEN_LOCATOR_CACHE_CLEARS), jitter(TIME_BETWEEN_LOCATOR_CACHE_CLEARS)); + public Metrics getMinCMetrics() { + return mincMetrics; } - private static long jitter(long ms) { - Random r = new Random(); - // add a random 10% wait - return (long) ((1. + (r.nextDouble() / 10)) * ms); - } - - private synchronized static void logGCInfo(AccumuloConfiguration conf) { - long now = System.currentTimeMillis(); - - List<GarbageCollectorMXBean> gcmBeans = ManagementFactory.getGarbageCollectorMXBeans(); - Runtime rt = Runtime.getRuntime(); - - StringBuilder sb = new StringBuilder("gc"); - - boolean sawChange = false; - - long maxIncreaseInCollectionTime = 0; - - for (GarbageCollectorMXBean gcBean : gcmBeans) { - Long prevTime = prevGcTime.get(gcBean.getName()); - long pt = 0; - if (prevTime != null) { - pt = prevTime; - } - - long time = gcBean.getCollectionTime(); - - if (time - pt != 0) { - sawChange = true; - } - - long increaseInCollectionTime = time - pt; - sb.append(String.format(" %s=%,.2f(+%,.2f) secs", gcBean.getName(), time / 1000.0, increaseInCollectionTime / 1000.0)); - maxIncreaseInCollectionTime = Math.max(increaseInCollectionTime, maxIncreaseInCollectionTime); - prevGcTime.put(gcBean.getName(), time); - } - - long mem = rt.freeMemory(); - if (maxIncreaseInCollectionTime == 0) { - gcTimeIncreasedCount = 0; - } else { - gcTimeIncreasedCount++; - if (gcTimeIncreasedCount > 3 && mem < rt.maxMemory() * 0.05) { - log.warn("Running low on memory"); - gcTimeIncreasedCount = 0; - } - } - - if (mem > lastMemorySize) { - sawChange = true; - } - - String sign = "+"; - if (mem - lastMemorySize <= 0) { - sign = ""; - } - - sb.append(String.format(" freemem=%,d(%s%,d) totalmem=%,d", mem, sign, (mem - lastMemorySize), rt.totalMemory())); - - if (sawChange) { - log.debug(sb.toString()); - } - - final long keepAliveTimeout = conf.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT); - if (lastMemoryCheckTime > 0 && lastMemoryCheckTime < now) { - long diff = now - lastMemoryCheckTime; - if (diff > keepAliveTimeout + 1000) { - log.warn(String.format("GC pause checker not called in a timely fashion. Expected every %.1f seconds but was %.1f seconds since last check", - TIME_BETWEEN_GC_CHECKS / 1000., diff / 1000.)); - } - lastMemoryCheckTime = now; - return; - } - - if (maxIncreaseInCollectionTime > keepAliveTimeout) { - Halt.halt("Garbage collection may be interfering with lock keep-alive. Halting.", -1); - } - - lastMemorySize = mem; - lastMemoryCheckTime = now; - } - - private TabletStatsKeeper statsKeeper; - - private static class Session { - long lastAccessTime; - long startTime; - String user; - String client = TServerUtils.clientAddress.get(); - public boolean reserved; - - public boolean cleanup() { - return true; - } - } - - private static class SessionManager { - - SecureRandom random; - Map<Long,Session> sessions; - private long maxIdle; - private long maxUpdateIdle; - private List<Session> idleSessions = new ArrayList<Session>(); - private final Long expiredSessionMarker = new Long(-1); - - SessionManager(AccumuloConfiguration conf) { - random = new SecureRandom(); - sessions = new HashMap<Long,Session>(); - maxUpdateIdle = conf.getTimeInMillis(Property.TSERV_UPDATE_SESSION_MAXIDLE); - maxIdle = conf.getTimeInMillis(Property.TSERV_SESSION_MAXIDLE); - - Runnable r = new Runnable() { - @Override - public void run() { - sweep(maxIdle, maxUpdateIdle); - } - }; - - SimpleTimer.getInstance().schedule(r, 0, Math.max(maxIdle / 2, 1000)); - } - - synchronized long createSession(Session session, boolean reserve) { - long sid = random.nextLong(); - - while (sessions.containsKey(sid)) { - sid = random.nextLong(); - } - - sessions.put(sid, session); - - session.reserved = reserve; - - session.startTime = session.lastAccessTime = System.currentTimeMillis(); - - return sid; - } - - long getMaxIdleTime() { - return maxIdle; - } - - /** - * while a session is reserved, it cannot be canceled or removed - */ - synchronized Session reserveSession(long sessionId) { - Session session = sessions.get(sessionId); - if (session != null) { - if (session.reserved) - throw new IllegalStateException(); - session.reserved = true; - } - - return session; - - } - - synchronized Session reserveSession(long sessionId, boolean wait) { - Session session = sessions.get(sessionId); - if (session != null) { - while (wait && session.reserved) { - try { - wait(1000); - } catch (InterruptedException e) { - throw new RuntimeException(); - } - } - - if (session.reserved) - throw new IllegalStateException(); - session.reserved = true; - } - - return session; - - } - - synchronized void unreserveSession(Session session) { - if (!session.reserved) - throw new IllegalStateException(); - notifyAll(); - session.reserved = false; - session.lastAccessTime = System.currentTimeMillis(); - } - - synchronized void unreserveSession(long sessionId) { - Session session = getSession(sessionId); - if (session != null) - unreserveSession(session); - } - - synchronized Session getSession(long sessionId) { - Session session = sessions.get(sessionId); - if (session != null) - session.lastAccessTime = System.currentTimeMillis(); - return session; - } - - Session removeSession(long sessionId) { - return removeSession(sessionId, false); - } - - Session removeSession(long sessionId, boolean unreserve) { - Session session = null; - synchronized (this) { - session = sessions.remove(sessionId); - if (unreserve && session != null) - unreserveSession(session); - } - - // do clean up out side of lock.. - if (session != null) - session.cleanup(); - - return session; - } - - private void sweep(final long maxIdle, final long maxUpdateIdle) { - ArrayList<Session> sessionsToCleanup = new ArrayList<Session>(); - synchronized (this) { - Iterator<Session> iter = sessions.values().iterator(); - while (iter.hasNext()) { - Session session = iter.next(); - long configuredIdle = maxIdle; - if (session instanceof UpdateSession) { - configuredIdle = maxUpdateIdle; - } - long idleTime = System.currentTimeMillis() - session.lastAccessTime; - if (idleTime > configuredIdle && !session.reserved) { - log.info("Closing idle session from user=" + session.user + ", client=" + session.client + ", idle=" + idleTime + "ms"); - iter.remove(); - sessionsToCleanup.add(session); - } - } - } - - // do clean up outside of lock for TabletServer in a synchronized block for simplicity vice a synchronized list - - synchronized (idleSessions) { - - sessionsToCleanup.addAll(idleSessions); - - idleSessions.clear(); - - // perform cleanup for all of the sessions - for (Session session : sessionsToCleanup) { - if (!session.cleanup()) - idleSessions.add(session); - } - } - - } - - synchronized void removeIfNotAccessed(final long sessionId, final long delay) { - Session session = sessions.get(sessionId); - if (session != null) { - final long removeTime = session.lastAccessTime; - TimerTask r = new TimerTask() { - @Override - public void run() { - Session sessionToCleanup = null; - synchronized (SessionManager.this) { - Session session2 = sessions.get(sessionId); - if (session2 != null && session2.lastAccessTime == removeTime && !session2.reserved) { - log.info("Closing not accessed session from user=" + session2.user + ", client=" + session2.client + ", duration=" + delay + "ms"); - sessions.remove(sessionId); - sessionToCleanup = session2; - } - } - - // call clean up outside of lock - if (sessionToCleanup != null) - sessionToCleanup.cleanup(); - } - }; - - SimpleTimer.getInstance().schedule(r, delay); - } - } - - public synchronized Map<String,MapCounter<ScanRunState>> getActiveScansPerTable() { - Map<String,MapCounter<ScanRunState>> counts = new HashMap<String,MapCounter<ScanRunState>>(); - Set<Entry<Long,Session>> copiedIdleSessions = new HashSet<Entry<Long,Session>>(); - - synchronized (idleSessions) { - /** - * Add sessions so that get the list returned in the active scans call - */ - for (Session session : idleSessions) { - copiedIdleSessions.add(Maps.immutableEntry(expiredSessionMarker, session)); - } - } - - for (Entry<Long,Session> entry : Iterables.concat(sessions.entrySet(), copiedIdleSessions)) { - - Session session = entry.getValue(); - @SuppressWarnings("rawtypes") - ScanTask nbt = null; - String tableID = null; - - if (session instanceof ScanSession) { - ScanSession ss = (ScanSession) session; - nbt = ss.nextBatchTask; - tableID = ss.extent.getTableId().toString(); - } else if (session instanceof MultiScanSession) { - MultiScanSession mss = (MultiScanSession) session; - nbt = mss.lookupTask; - tableID = mss.threadPoolExtent.getTableId().toString(); - } - - if (nbt == null) - continue; + private final LogSorter logSorter; + private ReplicationWorker replWorker = null; + private final TabletStatsKeeper statsKeeper; + private final AtomicInteger logIdGenerator = new AtomicInteger(); - ScanRunState srs = nbt.getScanRunState(); + private final AtomicLong flushCounter = new AtomicLong(0); + private final AtomicLong syncCounter = new AtomicLong(0); - if (srs == ScanRunState.FINISHED) - continue; + private final VolumeManager fs; - MapCounter<ScanRunState> stateCounts = counts.get(tableID); - if (stateCounts == null) { - stateCounts = new MapCounter<ScanRunState>(); - counts.put(tableID, stateCounts); - } + private final SortedMap<KeyExtent,Tablet> onlineTablets = Collections.synchronizedSortedMap(new TreeMap<KeyExtent,Tablet>()); + private final SortedSet<KeyExtent> unopenedTablets = Collections.synchronizedSortedSet(new TreeSet<KeyExtent>()); + private final SortedSet<KeyExtent> openingTablets = Collections.synchronizedSortedSet(new TreeSet<KeyExtent>()); + @SuppressWarnings("unchecked") + private final Map<KeyExtent,Long> recentlyUnloadedCache = Collections.synchronizedMap(new LRUMap(1000)); - stateCounts.increment(srs, 1); - } + private final TabletServerResourceManager resourceManager; + private final SecurityOperation security; - return counts; - } + private final BlockingDeque<MasterMessage> masterMessages = new LinkedBlockingDeque<MasterMessage>(); - public synchronized List<ActiveScan> getActiveScans() { + private Thread majorCompactorThread; - final List<ActiveScan> activeScans = new ArrayList<ActiveScan>(); - final long ct = System.currentTimeMillis(); - final Set<Entry<Long,Session>> copiedIdleSessions = new HashSet<Entry<Long,Session>>(); + private HostAndPort replicationAddress; + private HostAndPort clientAddress; - synchronized (idleSessions) { - /** - * Add sessions so that get the list returned in the active scans call - */ - for (Session session : idleSessions) { - copiedIdleSessions.add(Maps.immutableEntry(expiredSessionMarker, session)); - } - } + private volatile boolean serverStopRequested = false; + private volatile boolean majorCompactorDisabled = false; + private volatile boolean shutdownComplete = false; - for (Entry<Long,Session> entry : Iterables.concat(sessions.entrySet(), copiedIdleSessions)) { - Session session = entry.getValue(); - if (session instanceof ScanSession) { - ScanSession ss = (ScanSession) session; + private ZooLock tabletServerLock; - ScanState state = ScanState.RUNNING; + private TServer server; + private TServer replServer; - ScanTask<ScanBatch> nbt = ss.nextBatchTask; - if (nbt == null) { - state = ScanState.IDLE; - } else { - switch (nbt.getScanRunState()) { - case QUEUED: - state = ScanState.QUEUED; - break; - case FINISHED: - state = ScanState.IDLE; - break; - case RUNNING: - default: - /* do nothing */ - break; - } - } + private DistributedWorkQueue bulkFailedCopyQ; - ActiveScan activeScan = new ActiveScan(ss.client, ss.user, ss.extent.getTableId().toString(), ct - ss.startTime, ct - ss.lastAccessTime, - ScanType.SINGLE, state, ss.extent.toThrift(), Translator.translate(ss.columnSet, Translators.CT), ss.ssiList, ss.ssio, - ss.auths.getAuthorizationsBB()); + private String lockID; - // scanId added by ACCUMULO-2641 is an optional thrift argument and not available in ActiveScan constructor - activeScan.setScanId(entry.getKey()); - activeScans.add(activeScan); + public static final AtomicLong seekCount = new AtomicLong(0); - } else if (session instanceof MultiScanSession) { - MultiScanSession mss = (MultiScanSession) session; + private final AtomicLong totalMinorCompactions = new AtomicLong(0); + private final ServerConfigurationFactory confFactory; - ScanState state = ScanState.RUNNING; + private final ZooAuthenticationKeyWatcher authKeyWatcher; - ScanTask<MultiScanResult> nbt = mss.lookupTask; - if (nbt == null) { - state = ScanState.IDLE; - } else { - switch (nbt.getScanRunState()) { - case QUEUED: - state = ScanState.QUEUED; - break; - case FINISHED: - state = ScanState.IDLE; - break; - case RUNNING: - default: - /* do nothing */ - break; + public TabletServer(ServerConfigurationFactory confFactory, VolumeManager fs) { + super(confFactory); + this.confFactory = confFactory; + this.fs = fs; + AccumuloConfiguration aconf = getConfiguration(); + Instance instance = getInstance(); + log.info("Version " + Constants.VERSION); + log.info("Instance " + instance.getInstanceID()); + this.sessionManager = new SessionManager(aconf); + this.logSorter = new LogSorter(instance, fs, aconf); + this.replWorker = new ReplicationWorker(this, fs); + this.statsKeeper = new TabletStatsKeeper(); + SimpleTimer.getInstance(aconf).schedule(new Runnable() { + @Override + public void run() { + synchronized (onlineTablets) { + long now = System.currentTimeMillis(); + for (Tablet tablet : onlineTablets.values()) + try { + tablet.updateRates(now); + } catch (Exception ex) { + log.error("Error updating rates for {}", tablet.getExtent(), ex); } - } - - activeScans.add(new ActiveScan(mss.client, mss.user, mss.threadPoolExtent.getTableId().toString(), ct - mss.startTime, ct - mss.lastAccessTime, - ScanType.BATCH, state, mss.threadPoolExtent.toThrift(), Translator.translate(mss.columnSet, Translators.CT), mss.ssiList, mss.ssio, mss.auths - .getAuthorizationsBB())); } } + }, 5000, 5000); - return activeScans; - } - } - - static class TservConstraintEnv implements Environment { - - private TCredentials credentials; - private SecurityOperation security; - private Authorizations auths; - private KeyExtent ke; - - TservConstraintEnv(SecurityOperation secOp, TCredentials credentials) { - this.security = secOp; - this.credentials = credentials; - } - - void setExtent(KeyExtent ke) { - this.ke = ke; - } - - @Override - public KeyExtent getExtent() { - return ke; - } - - @Override - public String getUser() { - return credentials.getPrincipal(); - } - - @Override - @Deprecated - public Authorizations getAuthorizations() { - if (auths == null) - try { - this.auths = security.getUserAuthorizations(credentials); - } catch (ThriftSecurityException e) { - throw new RuntimeException(e); - } - return auths; - } - - @Override - public AuthorizationContainer getAuthorizationsContainer() { - return new AuthorizationContainer() { - @Override - public boolean contains(ByteSequence auth) { - try { - return security.userHasAuthorizations(credentials, - Collections.<ByteBuffer> singletonList(ByteBuffer.wrap(auth.getBackingArray(), auth.offset(), auth.length()))); - } catch (ThriftSecurityException e) { - throw new RuntimeException(e); - } - } - }; - } - } - - private abstract class ScanTask<T> implements RunnableFuture<T> { - - protected AtomicBoolean interruptFlag; - protected ArrayBlockingQueue<Object> resultQueue; - protected AtomicInteger state; - protected AtomicReference<ScanRunState> runState; - - private static final int INITIAL = 1; - private static final int ADDED = 2; - private static final int CANCELED = 3; - - ScanTask() { - interruptFlag = new AtomicBoolean(false); - runState = new AtomicReference<ScanRunState>(ScanRunState.QUEUED); - state = new AtomicInteger(INITIAL); - resultQueue = new ArrayBlockingQueue<Object>(1); - } - - protected void addResult(Object o) { - if (state.compareAndSet(INITIAL, ADDED)) - resultQueue.add(o); - else if (state.get() == ADDED) - throw new IllegalStateException("Tried to add more than one result"); - } - - @Override - public boolean cancel(boolean mayInterruptIfRunning) { - if (!mayInterruptIfRunning) - throw new IllegalArgumentException("Cancel will always attempt to interupt running next batch task"); - - if (state.get() == CANCELED) - return true; - - if (state.compareAndSet(INITIAL, CANCELED)) { - interruptFlag.set(true); - resultQueue = null; - return true; - } - - return false; - } - - @Override - public T get() throws InterruptedException, ExecutionException { - throw new UnsupportedOperationException(); - } - - @SuppressWarnings("unchecked") - @Override - public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { - - ArrayBlockingQueue<Object> localRQ = resultQueue; - - if (state.get() == CANCELED) - throw new CancellationException(); - - if (localRQ == null) { - int st = state.get(); - String stateStr; - switch (st) { - case ADDED: - stateStr = "ADDED"; - break; - case CANCELED: - stateStr = "CANCELED"; - break; - case INITIAL: - stateStr = "INITIAL"; - break; - default: - stateStr = "UNKNOWN"; - break; - } - throw new IllegalStateException("Tried to get result twice [state=" + stateStr + "(" + st + ")]"); - } - - Object r = localRQ.poll(timeout, unit); - - // could have been canceled while waiting - if (state.get() == CANCELED) { - if (r != null) - throw new IllegalStateException("Nothing should have been added when in canceled state"); + final long walogMaxSize = aconf.getMemoryInBytes(Property.TSERV_WALOG_MAX_SIZE); + final long minBlockSize = CachedConfiguration.getInstance().getLong("dfs.namenode.fs-limits.min-block-size", 0); + if (minBlockSize != 0 && minBlockSize > walogMaxSize) + throw new RuntimeException("Unable to start TabletServer. Logger is set to use blocksize " + walogMaxSize + " but hdfs minimum block size is " + + minBlockSize + ". Either increase the " + Property.TSERV_WALOG_MAX_SIZE + " or decrease dfs.namenode.fs-limits.min-block-size in hdfs-site.xml."); - throw new CancellationException(); + final long toleratedWalCreationFailures = aconf.getCount(Property.TSERV_WALOG_TOLERATED_CREATION_FAILURES); + final long walCreationFailureRetryIncrement = aconf.getTimeInMillis(Property.TSERV_WALOG_TOLERATED_WAIT_INCREMENT); + final long walCreationFailureRetryMax = aconf.getTimeInMillis(Property.TSERV_WALOG_TOLERATED_MAXIMUM_WAIT_DURATION); + // Tolerate `toleratedWalCreationFailures` failures, waiting `walCreationFailureRetryIncrement` milliseconds after the first failure, + // incrementing the next wait period by the same value, for a maximum of `walCreationFailureRetryMax` retries. + final RetryFactory walCreationRetryFactory = new RetryFactory(toleratedWalCreationFailures, walCreationFailureRetryIncrement, + walCreationFailureRetryIncrement, walCreationFailureRetryMax); + + logger = new TabletServerLogger(this, walogMaxSize, syncCounter, flushCounter, walCreationRetryFactory); + this.resourceManager = new TabletServerResourceManager(this, fs); + this.security = AuditedSecurityOperation.getInstance(this); + + metricsFactory = new TabletServerMetricsFactory(aconf); + updateMetrics = metricsFactory.createUpdateMetrics(); + scanMetrics = metricsFactory.createScanMetrics(); + mincMetrics = metricsFactory.createMincMetrics(); + SimpleTimer.getInstance(aconf).schedule(new Runnable() { + @Override + public void run() { + TabletLocator.clearLocators(); } + }, jitter(TIME_BETWEEN_LOCATOR_CACHE_CLEARS), jitter(TIME_BETWEEN_LOCATOR_CACHE_CLEARS)); - if (r == null) - throw new TimeoutException(); - - // make this method stop working now that something is being - // returned - resultQueue = null; - - if (r instanceof Throwable) - throw new ExecutionException((Throwable) r); - - return (T) r; - } - - @Override - public boolean isCancelled() { - return state.get() == CANCELED; - } - - @Override - public boolean isDone() { - return runState.get().equals(ScanRunState.FINISHED); - } - - public ScanRunState getScanRunState() { - return runState.get(); - } - - } - - private static class ConditionalSession extends Session { - public TCredentials credentials; - public Authorizations auths; - public String tableId; - public AtomicBoolean interruptFlag; - - @Override - public boolean cleanup() { - interruptFlag.set(true); - return true; - } - } - - private static class UpdateSession extends Session { - public Tablet currentTablet; - public MapCounter<Tablet> successfulCommits = new MapCounter<Tablet>(); - Map<KeyExtent,Long> failures = new HashMap<KeyExtent,Long>(); - HashMap<KeyExtent,SecurityErrorCode> authFailures = new HashMap<KeyExtent,SecurityErrorCode>(); - public Violations violations; - public TCredentials credentials; - public long totalUpdates = 0; - public long flushTime = 0; - Stat prepareTimes = new Stat(); - Stat walogTimes = new Stat(); - Stat commitTimes = new Stat(); - Stat authTimes = new Stat(); - public Map<Tablet,List<Mutation>> queuedMutations = new HashMap<Tablet,List<Mutation>>(); - public long queuedMutationSize = 0; - TservConstraintEnv cenv = null; - } - - private static class ScanSession extends Session { - public KeyExtent extent; - public HashSet<Column> columnSet; - public List<IterInfo> ssiList; - public Map<String,Map<String,String>> ssio; - public Authorizations auths; - public long entriesReturned = 0; - public Stat nbTimes = new Stat(); - public long batchCount = 0; - public volatile ScanTask<ScanBatch> nextBatchTask; - public AtomicBoolean interruptFlag; - public Scanner scanner; - public long readaheadThreshold = Constants.SCANNER_DEFAULT_READAHEAD_THRESHOLD; - - @Override - public boolean cleanup() { - final boolean ret; - try { - if (nextBatchTask != null) - nextBatchTask.cancel(true); - } finally { - if (scanner != null) - ret = scanner.close(); - else - ret = true; - } - return ret; + // Create the secret manager + setSecretManager(new AuthenticationTokenSecretManager(instance, aconf.getTimeInMillis(Property.GENERAL_DELEGATION_TOKEN_LIFETIME))); + if (aconf.getBoolean(Property.INSTANCE_RPC_SASL_ENABLED)) { + log.info("SASL is enabled, creating ZooKeeper watcher for AuthenticationKeys"); + // Watcher to notice new AuthenticationKeys which enable delegation tokens + authKeyWatcher = new ZooAuthenticationKeyWatcher(getSecretManager(), ZooReaderWriter.getInstance(), ZooUtil.getRoot(instance) + + Constants.ZDELEGATION_TOKEN_KEYS); + } else { + authKeyWatcher = null; } - } - private static class MultiScanSession extends Session { - HashSet<Column> columnSet; - Map<KeyExtent,List<Range>> queries; - public List<IterInfo> ssiList; - public Map<String,Map<String,String>> ssio; - public Authorizations auths; - - // stats - int numRanges; - int numTablets; - int numEntries; - long totalLookupTime; - - public volatile ScanTask<MultiScanResult> lookupTask; - public KeyExtent threadPoolExtent; - - @Override - public boolean cleanup() { - if (lookupTask != null) - lookupTask.cancel(true); - // the cancellation should provide us the safety to return true here - return true; - } + private static long jitter(long ms) { + Random r = new Random(); + // add a random 10% wait + return (long) ((1. + (r.nextDouble() / 10)) * ms); } - /** - * This little class keeps track of writes in progress and allows readers to wait for writes that started before the read. It assumes that the operation ids - * are monotonically increasing. - * - */ - static class WriteTracker { - private static AtomicLong operationCounter = new AtomicLong(1); - private Map<TabletType,TreeSet<Long>> inProgressWrites = new EnumMap<TabletType,TreeSet<Long>>(TabletType.class); - - WriteTracker() { - for (TabletType ttype : TabletType.values()) { - inProgressWrites.put(ttype, new TreeSet<Long>()); - } - } - - synchronized long startWrite(TabletType ttype) { - long operationId = operationCounter.getAndIncrement(); - inProgressWrites.get(ttype).add(operationId); - return operationId; - } - - synchronized void finishWrite(long operationId) { - if (operationId == -1) - return; + private final SessionManager sessionManager; - boolean removed = false; + private final WriteTracker writeTracker = new WriteTracker(); - for (TabletType ttype : TabletType.values()) { - removed = inProgressWrites.get(ttype).remove(operationId); - if (removed) - break; - } + private final RowLocks rowLocks = new RowLocks(); - if (!removed) { - throw new IllegalArgumentException("Attempted to finish write not in progress, operationId " + operationId); - } - - this.notifyAll(); - } - - synchronized void waitForWrites(TabletType ttype) { - long operationId = operationCounter.getAndIncrement(); - while (inProgressWrites.get(ttype).floor(operationId) != null) { - try { - this.wait(); - } catch (InterruptedException e) { - log.error(e, e); - } - } - } - - public long startWrite(Set<Tablet> keySet) { - if (keySet.size() == 0) - return -1; - - ArrayList<KeyExtent> extents = new ArrayList<KeyExtent>(keySet.size()); - - for (Tablet tablet : keySet) - extents.add(tablet.getExtent()); - - return startWrite(TabletType.type(extents)); - } - } - - public AccumuloConfiguration getSystemConfiguration() { - return serverConfig.getConfiguration(); - } - - TransactionWatcher watcher = new TransactionWatcher(); + private final AtomicLong totalQueuedMutationSize = new AtomicLong(0); + private final ReentrantLock recoveryLock = new ReentrantLock(true); private class ThriftClientHandler extends ClientServiceHandler implements TabletClientService.Iface { @@@ -1071,6 -1930,7 +1067,7 @@@ Iterator<Entry<KeyExtent,List<ServerConditionalMutation>>> iter = updates.entrySet().iterator(); final CompressedIterators compressedIters = new CompressedIterators(symbols); - ConditionCheckerContext checkerContext = new ConditionCheckerContext(compressedIters, ServerConfiguration.getTableConfiguration(instance, cs.tableId)); ++ ConditionCheckerContext checkerContext = new ConditionCheckerContext(compressedIters, confFactory.getTableConfiguration(cs.tableId)); while (iter.hasNext()) { final Entry<KeyExtent,List<ServerConditionalMutation>> entry = iter.next(); @@@ -1082,65 -1942,44 +1079,28 @@@ iter.remove(); } else { final List<ServerConditionalMutation> okMutations = new ArrayList<ServerConditionalMutation>(entry.getValue().size()); + final List<TCMResult> resultsSubList = results.subList(results.size(), results.size()); - for (ServerConditionalMutation scm : entry.getValue()) { - if (checkCondition(results, cs, compressedIters, tablet, scm)) - okMutations.add(scm); - } - - entry.setValue(okMutations); - } - - } - } - - private boolean checkCondition(ArrayList<TCMResult> results, ConditionalSession cs, CompressedIterators compressedIters, Tablet tablet, - ServerConditionalMutation scm) throws IOException { - boolean add = true; - - for (TCondition tc : scm.getConditions()) { - - Range range; - if (tc.hasTimestamp) - range = Range.exact(new Text(scm.getRow()), new Text(tc.getCf()), new Text(tc.getCq()), new Text(tc.getCv()), tc.getTs()); - else - range = Range.exact(new Text(scm.getRow()), new Text(tc.getCf()), new Text(tc.getCq()), new Text(tc.getCv())); - - IterConfig ic = compressedIters.decompress(tc.iterators); - - Scanner scanner = tablet.createScanner(range, 1, EMPTY_COLUMNS, cs.auths, ic.ssiList, ic.ssio, false, cs.interruptFlag); - - try { - ScanBatch batch = scanner.read(); - - Value val = null; + ConditionChecker checker = checkerContext.newChecker(entry.getValue(), okMutations, resultsSubList); + try { + tablet.checkConditions(checker, cs.auths, cs.interruptFlag); - for (KVEntry entry2 : batch.getResults()) { - val = entry2.getValue(); - break; - } + if (okMutations.size() > 0) { + entry.setValue(okMutations); + } else { + iter.remove(); + } - } catch (TabletClosedException e) { - // clear anything added while checking conditions. - resultsSubList.clear(); - - for (ServerConditionalMutation scm : entry.getValue()) { - results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED)); - } - iter.remove(); - } catch (IterationInterruptedException e) { - // clear anything added while checking conditions. - resultsSubList.clear(); - - for (ServerConditionalMutation scm : entry.getValue()) { - results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED)); - } - iter.remove(); - } catch (TooManyFilesException e) { ++ } catch (TabletClosedException | IterationInterruptedException | TooManyFilesException e) { + // clear anything added while checking conditions. + resultsSubList.clear(); - if ((val == null ^ tc.getVal() == null) || (val != null && !Arrays.equals(tc.getVal(), val.get()))) { - results.add(new TCMResult(scm.getID(), TCMStatus.REJECTED)); - add = false; - break; + for (ServerConditionalMutation scm : entry.getValue()) { + results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED)); + } + iter.remove(); } - - } catch (TabletClosedException e) { - results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED)); - add = false; - break; - } catch (IterationInterruptedException iie) { - results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED)); - add = false; - break; - } catch (TooManyFilesException tmfe) { - results.add(new TCMResult(scm.getID(), TCMStatus.IGNORED)); - add = false; - break; } } - return add; } private void writeConditionalMutations(Map<KeyExtent,List<ServerConditionalMutation>> updates, ArrayList<TCMResult> results, ConditionalSession sess) { http://git-wip-us.apache.org/repos/asf/accumulo/blob/d4882a15/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java ---------------------------------------------------------------------- diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java index 33277bd,0000000..f586e2e mode 100644,000000..100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java @@@ -1,228 -1,0 +1,247 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.tserver.tablet; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; ++import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; ++import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.accumulo.core.data.Column; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Value; +import org.apache.accumulo.core.data.thrift.IterInfo; +import org.apache.accumulo.core.iterators.IterationInterruptedException; +import org.apache.accumulo.core.iterators.IteratorEnvironment; +import org.apache.accumulo.core.iterators.IteratorUtil; +import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope; +import org.apache.accumulo.core.iterators.SortedKeyValueIterator; +import org.apache.accumulo.core.iterators.system.ColumnFamilySkippingIterator; +import org.apache.accumulo.core.iterators.system.ColumnQualifierFilter; +import org.apache.accumulo.core.iterators.system.DeletingIterator; +import org.apache.accumulo.core.iterators.system.InterruptibleIterator; +import org.apache.accumulo.core.iterators.system.MultiIterator; +import org.apache.accumulo.core.iterators.system.SourceSwitchingIterator.DataSource; +import org.apache.accumulo.core.iterators.system.StatsIterator; +import org.apache.accumulo.core.iterators.system.VisibilityFilter; +import org.apache.accumulo.core.metadata.schema.DataFileValue; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.util.Pair; +import org.apache.accumulo.server.fs.FileRef; +import org.apache.accumulo.tserver.FileManager.ScanFileManager; +import org.apache.accumulo.tserver.InMemoryMap.MemoryIterator; +import org.apache.accumulo.tserver.TabletIteratorEnvironment; +import org.apache.accumulo.tserver.TabletServer; + +class ScanDataSource implements DataSource { + + // data source state + private final Tablet tablet; + private ScanFileManager fileManager; + private SortedKeyValueIterator<Key,Value> iter; + private long expectedDeletionCount; + private List<MemoryIterator> memIters = null; + private long fileReservationId; + private AtomicBoolean interruptFlag; + private StatsIterator statsIterator; + + private final ScanOptions options; ++ private final boolean loadIters; ++ ++ private static final Set<Column> EMPTY_COLS = Collections.emptySet(); + + ScanDataSource(Tablet tablet, Authorizations authorizations, byte[] defaultLabels, HashSet<Column> columnSet, List<IterInfo> ssiList, + Map<String,Map<String,String>> ssio, AtomicBoolean interruptFlag) { + this.tablet = tablet; + expectedDeletionCount = tablet.getDataSourceDeletions(); + this.options = new ScanOptions(-1, authorizations, defaultLabels, columnSet, ssiList, ssio, interruptFlag, false); + this.interruptFlag = interruptFlag; ++ this.loadIters = true; + } + + ScanDataSource(Tablet tablet, ScanOptions options) { + this.tablet = tablet; + expectedDeletionCount = tablet.getDataSourceDeletions(); + this.options = options; + this.interruptFlag = options.getInterruptFlag(); ++ this.loadIters = true; ++ } ++ ++ ScanDataSource(Tablet tablet, Authorizations authorizations, byte[] defaultLabels, AtomicBoolean iFlag) { ++ this.tablet = tablet; ++ expectedDeletionCount = tablet.getDataSourceDeletions(); ++ this.options = new ScanOptions(-1, authorizations, defaultLabels, EMPTY_COLS, null, null, iFlag, false); ++ this.interruptFlag = iFlag; ++ this.loadIters = false; + } + + @Override + public DataSource getNewDataSource() { + if (!isCurrent()) { + // log.debug("Switching data sources during a scan"); + if (memIters != null) { + tablet.getTabletMemory().returnIterators(memIters); + memIters = null; + tablet.getDatafileManager().returnFilesForScan(fileReservationId); + fileReservationId = -1; + } + + if (fileManager != null) + fileManager.releaseOpenFiles(false); + + expectedDeletionCount = tablet.getDataSourceDeletions(); + iter = null; + + return this; + } else + return this; + } + + @Override + public boolean isCurrent() { + return expectedDeletionCount == tablet.getDataSourceDeletions(); + } + + @Override + public SortedKeyValueIterator<Key,Value> iterator() throws IOException { + if (iter == null) + iter = createIterator(); + return iter; + } + + private SortedKeyValueIterator<Key,Value> createIterator() throws IOException { + + Map<FileRef,DataFileValue> files; + + synchronized (tablet) { + + if (memIters != null) + throw new IllegalStateException("Tried to create new scan iterator w/o releasing memory"); + + if (tablet.isClosed()) + throw new TabletClosedException(); + + if (interruptFlag.get()) + throw new IterationInterruptedException(tablet.getExtent().toString() + " " + interruptFlag.hashCode()); + + // only acquire the file manager when we know the tablet is open + if (fileManager == null) { + fileManager = tablet.getTabletResources().newScanFileManager(); + tablet.addActiveScans(this); + } + + if (fileManager.getNumOpenFiles() != 0) + throw new IllegalStateException("Tried to create new scan iterator w/o releasing files"); + + // set this before trying to get iterators in case + // getIterators() throws an exception + expectedDeletionCount = tablet.getDataSourceDeletions(); + + memIters = tablet.getTabletMemory().getIterators(); + Pair<Long,Map<FileRef,DataFileValue>> reservation = tablet.getDatafileManager().reserveFilesForScan(); + fileReservationId = reservation.getFirst(); + files = reservation.getSecond(); + } + + Collection<InterruptibleIterator> mapfiles = fileManager.openFiles(files, options.isIsolated()); + + List<SortedKeyValueIterator<Key,Value>> iters = new ArrayList<SortedKeyValueIterator<Key,Value>>(mapfiles.size() + memIters.size()); + + iters.addAll(mapfiles); + iters.addAll(memIters); + + for (SortedKeyValueIterator<Key,Value> skvi : iters) + ((InterruptibleIterator) skvi).setInterruptFlag(interruptFlag); + + MultiIterator multiIter = new MultiIterator(iters, tablet.getExtent()); + + TabletIteratorEnvironment iterEnv = new TabletIteratorEnvironment(IteratorScope.scan, tablet.getTableConfiguration(), fileManager, files, + options.getAuthorizations()); + + statsIterator = new StatsIterator(multiIter, TabletServer.seekCount, tablet.getScannedCounter()); + + DeletingIterator delIter = new DeletingIterator(statsIterator, false); + + ColumnFamilySkippingIterator cfsi = new ColumnFamilySkippingIterator(delIter); + + ColumnQualifierFilter colFilter = new ColumnQualifierFilter(cfsi, options.getColumnSet()); + + VisibilityFilter visFilter = new VisibilityFilter(colFilter, options.getAuthorizations(), options.getDefaultLabels()); + - return iterEnv.getTopLevelIterator(IteratorUtil.loadIterators(IteratorScope.scan, visFilter, tablet.getExtent(), tablet.getTableConfiguration(), - options.getSsiList(), options.getSsio(), iterEnv)); ++ if (!loadIters) { ++ return visFilter; ++ } else { ++ return iterEnv.getTopLevelIterator(IteratorUtil.loadIterators(IteratorScope.scan, visFilter, tablet.getExtent(), tablet.getTableConfiguration(), ++ options.getSsiList(), options.getSsio(), iterEnv)); ++ } + } + + void close(boolean sawErrors) { + + if (memIters != null) { + tablet.getTabletMemory().returnIterators(memIters); + memIters = null; + tablet.getDatafileManager().returnFilesForScan(fileReservationId); + fileReservationId = -1; + } + + synchronized (tablet) { + if (tablet.removeScan(this) == 0) + tablet.notifyAll(); + } + + if (fileManager != null) { + fileManager.releaseOpenFiles(sawErrors); + fileManager = null; + } + + if (statsIterator != null) { + statsIterator.report(); + } + + } + + public void interrupt() { + interruptFlag.set(true); + } + + @Override + public DataSource getDeepCopyDataSource(IteratorEnvironment env) { + throw new UnsupportedOperationException(); + } + + public void reattachFileManager() throws IOException { + if (fileManager != null) + fileManager.reattach(); + } + + public void detachFileManager() { + if (fileManager != null) + fileManager.detach(); + } + + @Override + public void setInterruptFlag(AtomicBoolean flag) { + throw new UnsupportedOperationException(); + } + +}