Merge branch '1.6' Conflicts: server/tserver/src/main/java/org/apache/accumulo/tserver/Tablet.java server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java trace/src/main/java/org/apache/accumulo/trace/thrift/TInfo.java
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/44e2b2cd Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/44e2b2cd Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/44e2b2cd Branch: refs/heads/master Commit: 44e2b2cdf89b26a94540e653386bbe9ab269296f Parents: 44ad36c 3bf4666 Author: Corey J. Nolet <cjno...@gmail.com> Authored: Wed Jan 7 23:40:33 2015 -0500 Committer: Corey J. Nolet <cjno...@gmail.com> Committed: Wed Jan 7 23:40:33 2015 -0500 ---------------------------------------------------------------------- .../core/client/impl/ActiveScanImpl.java | 5 +- .../core/tabletserver/thrift/ActiveScan.java | 102 +++++- core/src/main/thrift/tabletserver.thrift | 1 + .../tserver/session/SessionManager.java | 8 +- .../accumulo/test/functional/ScanIdIT.java | 360 +++++++++++++++++++ 5 files changed, 469 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/44e2b2cd/core/src/main/thrift/tabletserver.thrift ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/44e2b2cd/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java ---------------------------------------------------------------------- diff --cc server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java index c9445c6,0000000..13049e2 mode 100644,000000..100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java @@@ -1,313 -1,0 +1,317 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.tserver.session; + +import java.security.SecureRandom; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.TimerTask; + +import org.apache.accumulo.core.client.impl.Translator; +import org.apache.accumulo.core.client.impl.Translators; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.thrift.MultiScanResult; +import org.apache.accumulo.core.tabletserver.thrift.ActiveScan; +import org.apache.accumulo.core.tabletserver.thrift.ScanState; +import org.apache.accumulo.core.tabletserver.thrift.ScanType; +import org.apache.accumulo.core.util.MapCounter; +import org.apache.accumulo.server.util.time.SimpleTimer; +import org.apache.accumulo.tserver.scan.ScanRunState; +import org.apache.accumulo.tserver.scan.ScanTask; +import org.apache.accumulo.tserver.tablet.ScanBatch; +import org.apache.log4j.Logger; + +public class SessionManager { + private static final Logger log = Logger.getLogger(SessionManager.class); + + private final SecureRandom random = new SecureRandom(); + private final Map<Long,Session> sessions = new HashMap<Long,Session>(); + private final long maxIdle; + private final AccumuloConfiguration aconf; + + public SessionManager(AccumuloConfiguration conf) { + aconf = conf; + maxIdle = conf.getTimeInMillis(Property.TSERV_SESSION_MAXIDLE); + + Runnable r = new Runnable() { + @Override + public void run() { + sweep(maxIdle); + } + }; + + SimpleTimer.getInstance(conf).schedule(r, 0, Math.max(maxIdle / 2, 1000)); + } + + public synchronized long createSession(Session session, boolean reserve) { + long sid = random.nextLong(); + + while (sessions.containsKey(sid)) { + sid = random.nextLong(); + } + + sessions.put(sid, session); + + session.reserved = reserve; + + session.startTime = session.lastAccessTime = System.currentTimeMillis(); + + return sid; + } + + public long getMaxIdleTime() { + return maxIdle; + } + + /** + * while a session is reserved, it cannot be canceled or removed + */ + + public synchronized Session reserveSession(long sessionId) { + Session session = sessions.get(sessionId); + if (session != null) { + if (session.reserved) + throw new IllegalStateException(); + session.reserved = true; + } + + return session; + + } + + public synchronized Session reserveSession(long sessionId, boolean wait) { + Session session = sessions.get(sessionId); + if (session != null) { + while (wait && session.reserved) { + try { + wait(1000); + } catch (InterruptedException e) { + throw new RuntimeException(); + } + } + + if (session.reserved) + throw new IllegalStateException(); + session.reserved = true; + } + + return session; + + } + + public synchronized void unreserveSession(Session session) { + if (!session.reserved) + throw new IllegalStateException(); + notifyAll(); + session.reserved = false; + session.lastAccessTime = System.currentTimeMillis(); + } + + public synchronized void unreserveSession(long sessionId) { + Session session = getSession(sessionId); + if (session != null) + unreserveSession(session); + } + + public synchronized Session getSession(long sessionId) { + Session session = sessions.get(sessionId); + if (session != null) + session.lastAccessTime = System.currentTimeMillis(); + return session; + } + + public Session removeSession(long sessionId) { + return removeSession(sessionId, false); + } + + public Session removeSession(long sessionId, boolean unreserve) { + Session session = null; + synchronized (this) { + session = sessions.remove(sessionId); + if (unreserve && session != null) + unreserveSession(session); + } + + // do clean up out side of lock.. + if (session != null) + session.cleanup(); + + return session; + } + + private void sweep(long maxIdle) { + List<Session> sessionsToCleanup = new ArrayList<Session>(); + synchronized (this) { + Iterator<Session> iter = sessions.values().iterator(); + while (iter.hasNext()) { + Session session = iter.next(); + long idleTime = System.currentTimeMillis() - session.lastAccessTime; + if (idleTime > maxIdle && !session.reserved) { + log.info("Closing idle session from user=" + session.getUser() + ", client=" + session.client + ", idle=" + idleTime + "ms"); + iter.remove(); + sessionsToCleanup.add(session); + } + } + } + + // do clean up outside of lock + for (Session session : sessionsToCleanup) { + session.cleanup(); + } + } + + public synchronized void removeIfNotAccessed(final long sessionId, final long delay) { + Session session = sessions.get(sessionId); + if (session != null) { + final long removeTime = session.lastAccessTime; + TimerTask r = new TimerTask() { + @Override + public void run() { + Session sessionToCleanup = null; + synchronized (SessionManager.this) { + Session session2 = sessions.get(sessionId); + if (session2 != null && session2.lastAccessTime == removeTime && !session2.reserved) { + log.info("Closing not accessed session from user=" + session2.getUser() + ", client=" + session2.client + ", duration=" + delay + "ms"); + sessions.remove(sessionId); + sessionToCleanup = session2; + } + } + + // call clean up outside of lock + if (sessionToCleanup != null) + sessionToCleanup.cleanup(); + } + }; + + SimpleTimer.getInstance(aconf).schedule(r, delay); + } + } + + public synchronized Map<String,MapCounter<ScanRunState>> getActiveScansPerTable() { + Map<String,MapCounter<ScanRunState>> counts = new HashMap<String,MapCounter<ScanRunState>>(); + for (Entry<Long,Session> entry : sessions.entrySet()) { + + Session session = entry.getValue(); + @SuppressWarnings("rawtypes") + ScanTask nbt = null; + String tableID = null; + + if (session instanceof ScanSession) { + ScanSession ss = (ScanSession) session; + nbt = ss.nextBatchTask; + tableID = ss.extent.getTableId().toString(); + } else if (session instanceof MultiScanSession) { + MultiScanSession mss = (MultiScanSession) session; + nbt = mss.lookupTask; + tableID = mss.threadPoolExtent.getTableId().toString(); + } + + if (nbt == null) + continue; + + ScanRunState srs = nbt.getScanRunState(); + + if (srs == ScanRunState.FINISHED) + continue; + + MapCounter<ScanRunState> stateCounts = counts.get(tableID); + if (stateCounts == null) { + stateCounts = new MapCounter<ScanRunState>(); + counts.put(tableID, stateCounts); + } + + stateCounts.increment(srs, 1); + } + + return counts; + } + + public synchronized List<ActiveScan> getActiveScans() { + + List<ActiveScan> activeScans = new ArrayList<ActiveScan>(); + + long ct = System.currentTimeMillis(); + + for (Entry<Long,Session> entry : sessions.entrySet()) { + Session session = entry.getValue(); + if (session instanceof ScanSession) { + ScanSession ss = (ScanSession) session; + + ScanState state = ScanState.RUNNING; + + ScanTask<ScanBatch> nbt = ss.nextBatchTask; + if (nbt == null) { + state = ScanState.IDLE; + } else { + switch (nbt.getScanRunState()) { + case QUEUED: + state = ScanState.QUEUED; + break; + case FINISHED: + state = ScanState.IDLE; + break; + case RUNNING: + default: + /* do nothing */ + break; + } + } + - activeScans.add(new ActiveScan(ss.client, ss.getUser(), ss.extent.getTableId().toString(), ct - ss.startTime, ct - ss.lastAccessTime, ScanType.SINGLE, - state, ss.extent.toThrift(), Translator.translate(ss.columnSet, Translators.CT), ss.ssiList, ss.ssio, ss.auths.getAuthorizationsBB())); ++ ActiveScan activeScan = new ActiveScan(ss.client, ss.getUser(), ss.extent.getTableId().toString(), ct - ss.startTime, ct - ss.lastAccessTime, ScanType.SINGLE, ++ state, ss.extent.toThrift(), Translator.translate(ss.columnSet, Translators.CT), ss.ssiList, ss.ssio, ss.auths.getAuthorizationsBB()); ++ ++ // scanId added by ACCUMULO-2641 is an optional thrift argument and not available in ActiveScan constructor ++ activeScan.setScanId(entry.getKey()); ++ activeScans.add(activeScan); + + } else if (session instanceof MultiScanSession) { + MultiScanSession mss = (MultiScanSession) session; + + ScanState state = ScanState.RUNNING; + + ScanTask<MultiScanResult> nbt = mss.lookupTask; + if (nbt == null) { + state = ScanState.IDLE; + } else { + switch (nbt.getScanRunState()) { + case QUEUED: + state = ScanState.QUEUED; + break; + case FINISHED: + state = ScanState.IDLE; + break; + case RUNNING: + default: + /* do nothing */ + break; + } + } + + activeScans.add(new ActiveScan(mss.client, mss.getUser(), mss.threadPoolExtent.getTableId().toString(), ct - mss.startTime, ct - mss.lastAccessTime, + ScanType.BATCH, state, mss.threadPoolExtent.toThrift(), Translator.translate(mss.columnSet, Translators.CT), mss.ssiList, mss.ssio, mss.auths + .getAuthorizationsBB())); + } + } + + return activeScans; + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/44e2b2cd/test/src/test/java/org/apache/accumulo/test/functional/ScanIdIT.java ---------------------------------------------------------------------- diff --cc test/src/test/java/org/apache/accumulo/test/functional/ScanIdIT.java index 0000000,178cb30..fe2e8cb mode 000000,100644..100644 --- a/test/src/test/java/org/apache/accumulo/test/functional/ScanIdIT.java +++ b/test/src/test/java/org/apache/accumulo/test/functional/ScanIdIT.java @@@ -1,0 -1,360 +1,360 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.accumulo.test.functional; + ++import java.util.EnumSet; ++import java.util.HashSet; ++import java.util.List; ++import java.util.Map; ++import java.util.Random; ++import java.util.Set; ++import java.util.SortedSet; ++import java.util.TreeSet; ++import java.util.concurrent.ConcurrentHashMap; ++import java.util.concurrent.ExecutorService; ++import java.util.concurrent.Executors; ++ + import org.apache.accumulo.core.client.AccumuloException; + import org.apache.accumulo.core.client.AccumuloSecurityException; + import org.apache.accumulo.core.client.BatchWriter; + import org.apache.accumulo.core.client.BatchWriterConfig; + import org.apache.accumulo.core.client.Connector; + import org.apache.accumulo.core.client.IteratorSetting; + import org.apache.accumulo.core.client.MutationsRejectedException; + import org.apache.accumulo.core.client.Scanner; + import org.apache.accumulo.core.client.TableNotFoundException; + import org.apache.accumulo.core.client.admin.ActiveScan; + import org.apache.accumulo.core.data.Key; + import org.apache.accumulo.core.data.Mutation; + import org.apache.accumulo.core.data.Range; + import org.apache.accumulo.core.data.Value; + import org.apache.accumulo.core.iterators.IteratorUtil; + import org.apache.accumulo.core.security.Authorizations; + import org.apache.accumulo.core.security.ColumnVisibility; + import org.apache.accumulo.core.util.UtilWaitThread; + import org.apache.accumulo.harness.AccumuloClusterIT; + import org.apache.hadoop.io.Text; + import org.junit.Test; + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + -import java.util.EnumSet; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Random; -import java.util.Set; -import java.util.SortedSet; -import java.util.TreeSet; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - + import static com.google.common.base.Charsets.UTF_8; + import static org.junit.Assert.assertTrue; + import static org.junit.Assert.fail; + + /** + * ACCUMULO-2641 Integration test. ACCUMULO-2641 Adds scan id to thrift protocol so that {@code org.apache.accumulo.core.client.admin.ActiveScan.getScanid()} + * returns a unique scan id.<p> + * <p/> + * The test uses the Minicluster and the {@code org.apache.accumulo.test.functional.SlowIterator} to create multiple scan sessions. + * The test exercises multiple tablet servers with splits and multiple ranges to force the scans to occur across multiple tablet servers + * for completeness. + * <p/> + * This patch modified thrift, the TraceRepoDeserializationTest test seems to fail unless the following be added: + * <p/> + * private static final long serialVersionUID = -4659975753252858243l; + * <p/> + * back into org.apache.accumulo.trace.thrift.TInfo until that test signature is regenerated. + */ + public class ScanIdIT extends AccumuloClusterIT { + + private static final Logger log = LoggerFactory.getLogger(ScanIdIT.class); + + private static final int NUM_SCANNERS = 8; + + private static final int NUM_DATA_ROWS = 100; + + private static final Random random = new Random(); + + private static final ExecutorService pool = Executors.newFixedThreadPool(NUM_SCANNERS); + + private static volatile boolean testInProgress = true; + + private static final Map<Integer,Value> resultsByWorker = new ConcurrentHashMap<Integer,Value>(); + + @Override + protected int defaultTimeoutSeconds() { + return 60; + } + + /** + * @throws Exception any exception is a test failure. + */ + @Test + public void testScanId() throws Exception { + + final String tableName = getUniqueNames(1)[0]; + Connector conn = getConnector(); + conn.tableOperations().create(tableName); + + addSplits(conn, tableName); + + generateSampleData(conn, tableName); + + attachSlowIterator(conn, tableName); + + for (int scannerIndex = 0; scannerIndex < NUM_SCANNERS; scannerIndex++) { + ScannerThread st = new ScannerThread(conn, scannerIndex, tableName); + pool.submit(st); + } + + // wait for scanners to report a result. + while (testInProgress) { + + if (resultsByWorker.size() < NUM_SCANNERS) { + log.trace("Results reported {}", resultsByWorker.size()); + UtilWaitThread.sleep(750); + } else { + // each worker has reported at least one result. + testInProgress = false; + + log.debug("Final result count {}", resultsByWorker.size()); + + // delay to allow scanners to react to end of test and cleanly close. + UtilWaitThread.sleep(1000); + } + + } + + // all scanner have reported at least 1 result, so check for unique scan ids. + Set<Long> scanIds = new HashSet<Long>(); + + List<String> tservers = conn.instanceOperations().getTabletServers(); + + log.debug("tablet servers {}", tservers.toString()); + + for (String tserver : tservers) { + + List<ActiveScan> activeScans = conn.instanceOperations().getActiveScans(tserver); + + log.debug("TServer {} has {} active scans", tserver, activeScans.size()); + + for (ActiveScan scan : activeScans) { + log.debug("Tserver {} scan id {}", tserver, scan.getScanid()); + scanIds.add(scan.getScanid()); + } + } + + assertTrue(NUM_SCANNERS <= scanIds.size()); + + } + + /** + * Runs scanner in separate thread to allow multiple scanners to execute in parallel. + * <p/> + * The thread run method is terminated when the testInProgress flag is set to false. + */ + private static class ScannerThread implements Runnable { + + private final Connector connector; + private Scanner scanner = null; + private final int workerIndex; + private final String tablename; + + public ScannerThread(final Connector connector, final int workerIndex, final String tablename) { + + this.connector = connector; + this.workerIndex = workerIndex; + this.tablename = tablename; + + } + + /** + * execute the scan across the sample data and put scan result into result map until + * testInProgress flag is set to false. + */ + @Override public void run() { + + /* + * set random initial delay of up to to + * allow scanners to proceed to different points. + */ + + long delay = random.nextInt(5000); + + log.trace("Start delay for worker thread {} is {}", workerIndex, delay); + + UtilWaitThread.sleep(delay); + + try { + + scanner = connector.createScanner(tablename, new Authorizations()); + + // Never start readahead + scanner.setReadaheadThreshold(Long.MAX_VALUE); + scanner.setBatchSize(1); + + // create different ranges to try to hit more than one tablet. + scanner.setRange(new Range(new Text(Integer.toString(workerIndex)), new Text("9"))); + + } catch (TableNotFoundException e) { + throw new IllegalStateException("Initialization failure. Could not create scanner", e); + } + + scanner.fetchColumnFamily(new Text("fam1")); + + for (Map.Entry<Key,Value> entry : scanner) { + + // exit when success condition is met. + if (!testInProgress) { + scanner.clearScanIterators(); + scanner.close(); + + return; + } + + Text row = entry.getKey().getRow(); + + log.trace("worker {}, row {}", workerIndex, row.toString()); + + if (entry.getValue() != null) { + + Value prevValue = resultsByWorker.put(workerIndex, entry.getValue()); + + // value should always being increasing + if (prevValue != null) { + + log.trace("worker {} values {}", workerIndex, String.format("%1$s < %2$s", prevValue, entry.getValue())); + + assertTrue(prevValue.compareTo(entry.getValue()) > 0); + } + } else { + log.info("Scanner returned null"); + fail("Scanner returned unexpected null value"); + } + + } + + log.debug("Scanner ran out of data. (info only, not an error) "); + + } + } + + /** + * Create splits on table and force migration by taking table offline and then bring back + * online for test. + * + * @param conn Accumulo connector Accumulo connector to test cluster or MAC instance. + */ + private void addSplits(final Connector conn, final String tableName) { + + SortedSet<Text> splits = createSplits(); + + try { + + conn.tableOperations().addSplits(tableName, splits); + + conn.tableOperations().offline(tableName, true); + + UtilWaitThread.sleep(2000); + conn.tableOperations().online(tableName, true); + + for (Text split : conn.tableOperations().listSplits(tableName)) { + log.trace("Split {}", split); + } + + } catch (AccumuloSecurityException e) { + throw new IllegalStateException("Initialization failed. Could not add splits to " + tableName, e); + } catch (TableNotFoundException e) { + throw new IllegalStateException("Initialization failed. Could not add splits to " + tableName, e); + } catch (AccumuloException e) { + throw new IllegalStateException("Initialization failed. Could not add splits to " + tableName, e); + } + + } + + /** + * Create splits to distribute data across multiple tservers. + * + * @return splits in sorted set for addSplits. + */ + private SortedSet<Text> createSplits() { + + SortedSet<Text> splits = new TreeSet<Text>(); + + for (int split = 0; split < 10; split++) { + splits.add(new Text(Integer.toString(split))); + } + + return splits; + } + + /** + * Generate some sample data using random row id to distribute across splits. + * <p/> + * The primary goal is to determine that each scanner is assigned a unique scan id. + * This test does check that the count value for fam1 increases if a scanner reads multiple value, but this is + * secondary consideration for this test, that is included for completeness. + * + * @param connector Accumulo connector Accumulo connector to test cluster or MAC instance. + */ + private void generateSampleData(Connector connector, final String tablename) { + + try { + + BatchWriter bw = connector.createBatchWriter(tablename, new BatchWriterConfig()); + + ColumnVisibility vis = new ColumnVisibility("public"); + + for (int i = 0; i < NUM_DATA_ROWS; i++) { + + Text rowId = new Text(String.format("%d", ((random.nextInt(10) * 100) + i))); + + Mutation m = new Mutation(rowId); + m.put(new Text("fam1"), new Text("count"), new Value(Integer.toString(i).getBytes(UTF_8))); + m.put(new Text("fam1"), new Text("positive"), vis, new Value(Integer.toString(NUM_DATA_ROWS - i).getBytes(UTF_8))); + m.put(new Text("fam1"), new Text("negative"), vis, new Value(Integer.toString(i - NUM_DATA_ROWS).getBytes(UTF_8))); + + log.trace("Added row {}", rowId); + + bw.addMutation(m); + } + + bw.close(); + } catch (TableNotFoundException ex) { + throw new IllegalStateException("Initialization failed. Could not create test data", ex); + } catch (MutationsRejectedException ex) { + throw new IllegalStateException("Initialization failed. Could not create test data", ex); + } + } + + /** + * Attach the test slow iterator so that we have time to read the scan id without creating a large dataset. Uses a + * fairly large sleep and delay times because we are not concerned with how much data is read and we do not read + * all of the data - the test stops once each scanner reports a scan id. + * + * @param connector Accumulo connector Accumulo connector to test cluster or MAC instance. + */ + private void attachSlowIterator(Connector connector, final String tablename) { + try { + + IteratorSetting slowIter = new IteratorSetting(50, "slowIter", "org.apache.accumulo.test.functional.SlowIterator"); + slowIter.addOption("sleepTime", "200"); + slowIter.addOption("seekSleepTime", "200"); + + connector.tableOperations().attachIterator(tablename, slowIter, EnumSet.of(IteratorUtil.IteratorScope.scan)); + + } catch (AccumuloException ex) { + throw new IllegalStateException("Initialization failed. Could not attach slow iterator", ex); + } catch (TableNotFoundException ex) { + throw new IllegalStateException("Initialization failed. Could not attach slow iterator", ex); + } catch (AccumuloSecurityException ex) { + throw new IllegalStateException("Initialization failed. Could not attach slow iterator", ex); + } + } + + }