ACCUMULO-1000 Added timeout & config to conditional writer. Added unit test
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/79019ef0 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/79019ef0 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/79019ef0 Branch: refs/heads/ACCUMULO-1000 Commit: 79019ef0477b76966e2aff7259443aa9cd2f1cce Parents: 5183ae4 Author: Keith Turner <ktur...@apache.org> Authored: Tue Jul 23 12:07:41 2013 -0400 Committer: Keith Turner <ktur...@apache.org> Committed: Tue Jul 23 12:11:06 2013 -0400 ---------------------------------------------------------------------- .../accumulo/core/client/ConditionalWriter.java | 35 ++---- .../core/client/ConditionalWriterConfig.java | 118 +++++++++++++++++++ .../apache/accumulo/core/client/Connector.java | 9 +- .../core/client/impl/ConditionalWriterImpl.java | 96 ++++++++++----- .../core/client/impl/ConnectorImpl.java | 7 +- .../core/client/mock/MockConnector.java | 3 +- .../server/tabletserver/TabletServer.java | 2 +- .../accumulo/test/FaultyConditionalWriter.java | 9 -- .../accumulo/test/functional/SlowIterator.java | 24 +++- .../accumulo/test/ConditionalWriterTest.java | 115 ++++++++++++++---- 10 files changed, 313 insertions(+), 105 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/79019ef0/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriter.java b/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriter.java index b434463..db29492 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriter.java +++ b/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriter.java @@ -18,8 +18,8 @@ package org.apache.accumulo.core.client; import java.util.Iterator; -import java.util.concurrent.TimeUnit; +import org.apache.accumulo.core.client.impl.thrift.SecurityErrorCode; import org.apache.accumulo.core.data.ConditionalMutation; /** @@ -48,11 +48,11 @@ public interface ConditionalWriter { public Status getStatus() throws AccumuloException, AccumuloSecurityException { if (status == null) { if (exception instanceof AccumuloException) - throw (AccumuloException) exception; - if (exception instanceof AccumuloSecurityException) - throw (AccumuloSecurityException) exception; - if (exception instanceof RuntimeException) - throw (RuntimeException) exception; + throw new AccumuloException(exception); + if (exception instanceof AccumuloSecurityException) { + AccumuloSecurityException ase = (AccumuloSecurityException) exception; + throw new AccumuloSecurityException(ase.getUser(), SecurityErrorCode.valueOf(ase.getSecurityErrorCode().name()), ase.getTableInfo(), ase); + } else throw new AccumuloException(exception); } @@ -94,33 +94,12 @@ public interface ConditionalWriter { * A condition contained a column visibility that could never be seen */ INVISIBLE_VISIBILITY, - /** - * nothing was done with this mutation, this is caused by previous mutations failing in some way like timing out - */ - IGNORED + } public abstract Iterator<Result> write(Iterator<ConditionalMutation> mutations); public abstract Result write(ConditionalMutation mutation); - - /** - * This setting determines how long a scanner will automatically retry when a failure occurs. By default a scanner will retry forever. - * - * Setting to zero or Long.MAX_VALUE and TimeUnit.MILLISECONDS means to retry forever. - * - * @param timeOut - * @param timeUnit - * determines how timeout is interpreted - */ - public void setTimeout(long timeOut, TimeUnit timeUnit); - - /** - * Returns the setting for how long a scanner will automatically retry when a failure occurs. - * - * @return the timeout configured for this scanner - */ - public long getTimeout(TimeUnit timeUnit); public void close(); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/79019ef0/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriterConfig.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriterConfig.java b/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriterConfig.java new file mode 100644 index 0000000..f2a91ea --- /dev/null +++ b/core/src/main/java/org/apache/accumulo/core/client/ConditionalWriterConfig.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.core.client; + +import java.util.concurrent.TimeUnit; + +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.util.ArgumentChecker; + +/** + * + * @since 1.6.0 + */ +public class ConditionalWriterConfig { + + private static final Long DEFAULT_TIMEOUT = Long.MAX_VALUE; + private Long timeout = null; + + private static final Integer DEFAULT_MAX_WRITE_THREADS = 3; + private Integer maxWriteThreads = null; + + private Authorizations auths = Authorizations.EMPTY; + + /** + * A set of authorization labels that will be checked against the column visibility of each key in order to filter data. The authorizations passed in must be + * a subset of the accumulo user's set of authorizations. If the accumulo user has authorizations (A1, A2) and authorizations (A2, A3) are passed, then an + * exception will be thrown. + * + * <p> + * Any condition that is not visible with this set of authorizations will fail. + * + * @param auths + */ + public ConditionalWriterConfig setAuthorizations(Authorizations auths) { + ArgumentChecker.notNull(auths); + this.auths = auths; + return this; + } + + /** + * Sets the maximum amount of time an unresponsive server will be re-tried. When this timeout is exceeded, the {@link ConditionalWriter} should return the + * mutation with an exception.<br /> + * For no timeout, set to zero, or {@link Long#MAX_VALUE} with {@link TimeUnit#MILLISECONDS}. + * + * <p> + * {@link TimeUnit#MICROSECONDS} or {@link TimeUnit#NANOSECONDS} will be truncated to the nearest {@link TimeUnit#MILLISECONDS}.<br /> + * If this truncation would result in making the value zero when it was specified as non-zero, then a minimum value of one {@link TimeUnit#MILLISECONDS} will + * be used. + * + * <p> + * <b>Default:</b> {@link Long#MAX_VALUE} (no timeout) + * + * @param timeout + * the timeout, in the unit specified by the value of {@code timeUnit} + * @param timeUnit + * determines how {@code timeout} will be interpreted + * @throws IllegalArgumentException + * if {@code timeout} is less than 0 + * @return {@code this} to allow chaining of set methods + */ + public ConditionalWriterConfig setTimeout(long timeout, TimeUnit timeUnit) { + if (timeout < 0) + throw new IllegalArgumentException("Negative timeout not allowed " + timeout); + + if (timeout == 0) + this.timeout = Long.MAX_VALUE; + else + // make small, positive values that truncate to 0 when converted use the minimum millis instead + this.timeout = Math.max(1, timeUnit.toMillis(timeout)); + return this; + } + + /** + * Sets the maximum number of threads to use for writing data to the tablet servers. + * + * <p> + * <b>Default:</b> 3 + * + * @param maxWriteThreads + * the maximum threads to use + * @throws IllegalArgumentException + * if {@code maxWriteThreads} is non-positive + * @return {@code this} to allow chaining of set methods + */ + public ConditionalWriterConfig setMaxWriteThreads(int maxWriteThreads) { + if (maxWriteThreads <= 0) + throw new IllegalArgumentException("Max threads must be positive " + maxWriteThreads); + + this.maxWriteThreads = maxWriteThreads; + return this; + } + + public Authorizations getAuthorizations() { + return auths; + } + + public long getTimeout(TimeUnit timeUnit) { + return timeUnit.convert(timeout != null ? timeout : DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS); + } + + public int getMaxWriteThreads() { + return maxWriteThreads != null ? maxWriteThreads : DEFAULT_MAX_WRITE_THREADS; + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/79019ef0/core/src/main/java/org/apache/accumulo/core/client/Connector.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/Connector.java b/core/src/main/java/org/apache/accumulo/core/client/Connector.java index 45a8162..bbfa55f 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/Connector.java +++ b/core/src/main/java/org/apache/accumulo/core/client/Connector.java @@ -178,16 +178,15 @@ public abstract class Connector { * * @param tableName * the name of the table to query data from - * @param authorizations - * A set of authorization labels that will be checked against the column visibility of each key in order to filter data. The authorizations passed in - * must be a subset of the accumulo user's set of authorizations. If the accumulo user has authorizations (A1, A2) and authorizations (A2, A3) are - * passed, then an exception will be thrown. + * @param config + * configuration used to create conditional writer * * @return ConditionalWriter object for writing ConditionalMutations * @throws TableNotFoundException * when the specified table doesn't exist + * @since 1.6.0 */ - public abstract ConditionalWriter createConditionalWriter(String tableName, Authorizations authorizations) throws TableNotFoundException; + public abstract ConditionalWriter createConditionalWriter(String tableName, ConditionalWriterConfig config) throws TableNotFoundException; /** * Accessor method for internal instance object. http://git-wip-us.apache.org/repos/asf/accumulo/blob/79019ef0/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java index ed20054..55aa718 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ConditionalWriterImpl.java @@ -37,12 +37,15 @@ import java.util.concurrent.TimeUnit; import org.apache.accumulo.core.client.AccumuloException; import org.apache.accumulo.core.client.AccumuloSecurityException; import org.apache.accumulo.core.client.ConditionalWriter; +import org.apache.accumulo.core.client.ConditionalWriterConfig; import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.client.TableDeletedException; import org.apache.accumulo.core.client.TableNotFoundException; import org.apache.accumulo.core.client.TableOfflineException; +import org.apache.accumulo.core.client.TimedOutException; import org.apache.accumulo.core.client.impl.TabletLocator.TabletServerMutations; import org.apache.accumulo.core.client.impl.thrift.ThriftSecurityException; +import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.data.ByteSequence; import org.apache.accumulo.core.data.Condition; import org.apache.accumulo.core.data.ConditionalMutation; @@ -94,6 +97,7 @@ class ConditionalWriterImpl implements ConditionalWriter { private TCredentials credentials; private TabletLocator locator; private String tableId; + private long timeout; private static class ServerQueue { BlockingQueue<TabletServerMutations<QCMutation>> queue = new LinkedBlockingQueue<TabletServerMutations<QCMutation>>(); @@ -125,7 +129,6 @@ class ConditionalWriterImpl implements ConditionalWriter { throw new NoSuchElementException(); try { - // TODO maybe call drainTo after take() to get a batch efficiently Result result = rq.poll(1, TimeUnit.SECONDS); while (result == null) { @@ -153,12 +156,14 @@ class ConditionalWriterImpl implements ConditionalWriter { private BlockingQueue<Result> resultQueue; private long resetTime; private long delay = 50; + private long entryTime; - QCMutation(ConditionalMutation cm, BlockingQueue<Result> resultQueue) { + QCMutation(ConditionalMutation cm, BlockingQueue<Result> resultQueue, long entryTime) { super(cm); this.resultQueue = resultQueue; + this.entryTime = entryTime; } - + @Override public int compareTo(Delayed o) { QCMutation oqcm = (QCMutation) o; @@ -171,7 +176,6 @@ class ConditionalWriterImpl implements ConditionalWriter { } void resetDelay() { - // TODO eventually timeout a mutation delay = Math.min(delay * 2, MAX_SLEEP); resetTime = System.currentTimeMillis(); } @@ -190,12 +194,37 @@ class ConditionalWriterImpl implements ConditionalWriter { return serverQueue; } - private void queueRetry(List<QCMutation> mutations) { - for (QCMutation qcm : mutations) { - qcm.resetDelay(); - } + private void queueRetry(List<QCMutation> mutations, String server) { - failedMutations.addAll(mutations); + if (timeout < Long.MAX_VALUE) { + + long time = System.currentTimeMillis(); + + ArrayList<QCMutation> mutations2 = new ArrayList<ConditionalWriterImpl.QCMutation>(mutations.size()); + + for (QCMutation qcm : mutations) { + qcm.resetDelay(); + if (time + qcm.getDelay(TimeUnit.MILLISECONDS) > qcm.entryTime + timeout) { + TimedOutException toe; + if (server != null) + toe = new TimedOutException(Collections.singleton(server)); + else + toe = new TimedOutException("Conditional mutation timed out"); + + qcm.resultQueue.add(new Result(toe, qcm, server)); + } else { + mutations2.add(qcm); + } + } + + if (mutations2.size() > 0) + failedMutations.addAll(mutations2); + + } else { + for (QCMutation qcm : mutations) + qcm.resetDelay(); + failedMutations.addAll(mutations); + } } private void queue(List<QCMutation> mutations) { @@ -221,7 +250,7 @@ class ConditionalWriterImpl implements ConditionalWriter { } if (failures.size() > 0) - queueRetry(failures); + queueRetry(failures, null); for (Entry<String,TabletServerMutations<QCMutation>> entry : binnedMutations.entrySet()) { queue(entry.getKey(), entry.getValue()); @@ -293,17 +322,17 @@ class ConditionalWriterImpl implements ConditionalWriter { } } - ConditionalWriterImpl(Instance instance, TCredentials credentials, String tableId, Authorizations authorizations) { + ConditionalWriterImpl(Instance instance, TCredentials credentials, String tableId, ConditionalWriterConfig config) { this.instance = instance; this.credentials = credentials; - this.auths = authorizations; - this.ve = new VisibilityEvaluator(authorizations); - // TODO make configurable - this.threadPool = new ScheduledThreadPoolExecutor(3); - this.threadPool.setMaximumPoolSize(3); + this.auths = config.getAuthorizations(); + this.ve = new VisibilityEvaluator(config.getAuthorizations()); + this.threadPool = new ScheduledThreadPoolExecutor(config.getMaxWriteThreads()); + this.threadPool.setMaximumPoolSize(config.getMaxWriteThreads()); this.locator = TabletLocator.getLocator(instance, new Text(tableId)); this.serverQueues = new HashMap<String,ServerQueue>(); this.tableId = tableId; + this.timeout = config.getTimeout(TimeUnit.MILLISECONDS); Runnable failureHandler = new Runnable() { @@ -328,6 +357,8 @@ class ConditionalWriterImpl implements ConditionalWriter { int count = 0; + long entryTime = System.currentTimeMillis(); + mloop: while (mutations.hasNext()) { // TODO stop reading from iterator if too much memory ConditionalMutation mut = mutations.next(); @@ -341,7 +372,7 @@ class ConditionalWriterImpl implements ConditionalWriter { } // copy the mutations so that even if caller changes it, it will not matter - mutationList.add(new QCMutation(mut, resultQueue)); + mutationList.add(new QCMutation(mut, resultQueue, entryTime)); } queue(mutationList); @@ -438,6 +469,15 @@ class ConditionalWriterImpl implements ConditionalWriter { } } + private TabletClientService.Iface getClient(String location) throws TTransportException { + TabletClientService.Iface client; + if (timeout < instance.getConfiguration().getTimeInMillis(Property.GENERAL_RPC_TIMEOUT)) + client = ThriftUtil.getTServerClient(location, timeout); + else + client = ThriftUtil.getTServerClient(location, instance.getConfiguration()); + return client; + } + private void sendToServer(String location, TabletServerMutations<QCMutation> mutations) { TabletClientService.Iface client = null; @@ -449,7 +489,8 @@ class ConditionalWriterImpl implements ConditionalWriter { Long sessionId = null; try { - client = ThriftUtil.getTServerClient(location, instance.getConfiguration()); + + client = getClient(location); Map<TKeyExtent,List<TConditionalMutation>> tmutations = new HashMap<TKeyExtent,List<TConditionalMutation>>(); @@ -486,7 +527,7 @@ class ConditionalWriterImpl implements ConditionalWriter { locator.invalidateCache(ke); } - queueRetry(ignored); + queueRetry(ignored, location); } catch (ThriftSecurityException tse) { AccumuloSecurityException ase = new AccumuloSecurityException(credentials.getPrincipal(), tse.getCode(), Tables.getPrintableTableInfoFromId(instance, @@ -508,11 +549,12 @@ class ConditionalWriterImpl implements ConditionalWriter { } } - private void queueRetry(Map<Long,CMK> cmidToCm) { + + private void queueRetry(Map<Long,CMK> cmidToCm, String location) { ArrayList<QCMutation> ignored = new ArrayList<QCMutation>(); for (CMK cmk : cmidToCm.values()) ignored.add(cmk.cm); - queueRetry(ignored); + queueRetry(ignored, location); } private void queueException(String location, Map<Long,CMK> cmidToCm, Exception e) { @@ -522,7 +564,7 @@ class ConditionalWriterImpl implements ConditionalWriter { private void invalidateSession(String location, TabletServerMutations<QCMutation> mutations, Map<Long,CMK> cmidToCm, Long sessionId) { if(sessionId == null){ - queueRetry(cmidToCm); + queueRetry(cmidToCm, location); }else{ try { invalidateSession(sessionId, location, mutations); @@ -587,7 +629,7 @@ class ConditionalWriterImpl implements ConditionalWriter { TInfo tinfo = Tracer.traceInfo(); try { - client = ThriftUtil.getTServerClient(location, instance.getConfiguration()); + client = getClient(location); client.invalidateConditionalUpdate(tinfo, sessionId); } finally { ThriftUtil.returnClient((TServiceClient) client); @@ -678,14 +720,6 @@ class ConditionalWriterImpl implements ConditionalWriter { return write(Collections.singleton(mutation).iterator()).next(); } - public void setTimeout(long timeOut, TimeUnit timeUnit) { - throw new UnsupportedOperationException(); - } - - public long getTimeout(TimeUnit timeUnit) { - throw new UnsupportedOperationException(); - } - @Override public void close() { //TODO could possible close cached sessions using async method to clean up sessions on server side http://git-wip-us.apache.org/repos/asf/accumulo/blob/79019ef0/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java b/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java index 693f3c9..57e36fd 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java +++ b/core/src/main/java/org/apache/accumulo/core/client/impl/ConnectorImpl.java @@ -25,6 +25,7 @@ import org.apache.accumulo.core.client.BatchScanner; import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.BatchWriterConfig; import org.apache.accumulo.core.client.ConditionalWriter; +import org.apache.accumulo.core.client.ConditionalWriterConfig; import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.client.MultiTableBatchWriter; @@ -131,10 +132,8 @@ public class ConnectorImpl extends Connector { } @Override - public ConditionalWriter createConditionalWriter(String tableName, Authorizations authorizations) throws TableNotFoundException { - ArgumentChecker.notNull(tableName, authorizations); - // TODO resolve table name to table id here and pass that - return new ConditionalWriterImpl(instance, credentials, getTableId(tableName), authorizations); + public ConditionalWriter createConditionalWriter(String tableName, ConditionalWriterConfig config) throws TableNotFoundException { + return new ConditionalWriterImpl(instance, credentials, getTableId(tableName), config); } @Override http://git-wip-us.apache.org/repos/asf/accumulo/blob/79019ef0/core/src/main/java/org/apache/accumulo/core/client/mock/MockConnector.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/client/mock/MockConnector.java b/core/src/main/java/org/apache/accumulo/core/client/mock/MockConnector.java index 4a405aa..4af2ea5 100644 --- a/core/src/main/java/org/apache/accumulo/core/client/mock/MockConnector.java +++ b/core/src/main/java/org/apache/accumulo/core/client/mock/MockConnector.java @@ -23,6 +23,7 @@ import org.apache.accumulo.core.client.BatchScanner; import org.apache.accumulo.core.client.BatchWriter; import org.apache.accumulo.core.client.BatchWriterConfig; import org.apache.accumulo.core.client.ConditionalWriter; +import org.apache.accumulo.core.client.ConditionalWriterConfig; import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.Instance; import org.apache.accumulo.core.client.MultiTableBatchWriter; @@ -130,7 +131,7 @@ public class MockConnector extends Connector { } @Override - public ConditionalWriter createConditionalWriter(String tableName, Authorizations authorizations) throws TableNotFoundException { + public ConditionalWriter createConditionalWriter(String tableName, ConditionalWriterConfig config) throws TableNotFoundException { // TODO add implementation throw new UnsupportedOperationException(); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/79019ef0/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java ---------------------------------------------------------------------- diff --git a/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java b/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java index 4f7ba92..c1a1fc3 100644 --- a/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java +++ b/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServer.java @@ -1933,7 +1933,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu ConditionalSession cs = (ConditionalSession) sessionManager.reserveSession(sessID); - if(cs == null) + if (cs == null || cs.interruptFlag.get()) throw new NoSuchScanIDException(); Text tid = new Text(cs.tableId); http://git-wip-us.apache.org/repos/asf/accumulo/blob/79019ef0/test/src/main/java/org/apache/accumulo/test/FaultyConditionalWriter.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/FaultyConditionalWriter.java b/test/src/main/java/org/apache/accumulo/test/FaultyConditionalWriter.java index de56218..7e7480f 100644 --- a/test/src/main/java/org/apache/accumulo/test/FaultyConditionalWriter.java +++ b/test/src/main/java/org/apache/accumulo/test/FaultyConditionalWriter.java @@ -20,7 +20,6 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import java.util.Random; -import java.util.concurrent.TimeUnit; import org.apache.accumulo.core.client.ConditionalWriter; import org.apache.accumulo.core.data.ConditionalMutation; @@ -74,14 +73,6 @@ public class FaultyConditionalWriter implements ConditionalWriter { return write(Collections.singleton(mutation).iterator()).next(); } - public void setTimeout(long timeOut, TimeUnit timeUnit) { - cw.setTimeout(timeOut, timeUnit); - } - - public long getTimeout(TimeUnit timeUnit) { - return cw.getTimeout(timeUnit); - } - @Override public void close() { cw.close(); http://git-wip-us.apache.org/repos/asf/accumulo/blob/79019ef0/test/src/main/java/org/apache/accumulo/test/functional/SlowIterator.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/functional/SlowIterator.java b/test/src/main/java/org/apache/accumulo/test/functional/SlowIterator.java index a71b1ad..03eaefb 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/SlowIterator.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/SlowIterator.java @@ -17,10 +17,13 @@ package org.apache.accumulo.test.functional; import java.io.IOException; +import java.util.Collection; import java.util.Map; import org.apache.accumulo.core.client.IteratorSetting; +import org.apache.accumulo.core.data.ByteSequence; import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Range; import org.apache.accumulo.core.data.Value; import org.apache.accumulo.core.iterators.IteratorEnvironment; import org.apache.accumulo.core.iterators.SortedKeyValueIterator; @@ -30,13 +33,19 @@ import org.apache.accumulo.core.util.UtilWaitThread; public class SlowIterator extends WrappingIterator { static private final String SLEEP_TIME = "sleepTime"; + static private final String SEEK_SLEEP_TIME = "seekSleepTime"; - long sleepTime; + private long sleepTime = 0; + private long seekSleepTime = 0; public static void setSleepTime(IteratorSetting is, long millis) { is.addOption(SLEEP_TIME, Long.toString(millis)); } + public static void setSeekSleepTime(IteratorSetting is, long t) { + is.addOption(SEEK_SLEEP_TIME, Long.toString(t)); + } + @Override public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) { throw new UnsupportedOperationException(); @@ -49,9 +58,20 @@ public class SlowIterator extends WrappingIterator { } @Override + public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive) throws IOException { + UtilWaitThread.sleep(seekSleepTime); + super.seek(range, columnFamilies, inclusive); + } + + @Override public void init(SortedKeyValueIterator<Key,Value> source, Map<String,String> options, IteratorEnvironment env) throws IOException { super.init(source, options, env); - sleepTime = Long.parseLong(options.get(SLEEP_TIME)); + if (options.containsKey(SLEEP_TIME)) + sleepTime = Long.parseLong(options.get(SLEEP_TIME)); + + if (options.containsKey(SEEK_SLEEP_TIME)) + seekSleepTime = Long.parseLong(options.get(SEEK_SLEEP_TIME)); } + } http://git-wip-us.apache.org/repos/asf/accumulo/blob/79019ef0/test/src/test/java/org/apache/accumulo/test/ConditionalWriterTest.java ---------------------------------------------------------------------- diff --git a/test/src/test/java/org/apache/accumulo/test/ConditionalWriterTest.java b/test/src/test/java/org/apache/accumulo/test/ConditionalWriterTest.java index 65a5636..66b699e 100644 --- a/test/src/test/java/org/apache/accumulo/test/ConditionalWriterTest.java +++ b/test/src/test/java/org/apache/accumulo/test/ConditionalWriterTest.java @@ -41,6 +41,7 @@ import org.apache.accumulo.core.client.BatchWriterConfig; import org.apache.accumulo.core.client.ConditionalWriter; import org.apache.accumulo.core.client.ConditionalWriter.Result; import org.apache.accumulo.core.client.ConditionalWriter.Status; +import org.apache.accumulo.core.client.ConditionalWriterConfig; import org.apache.accumulo.core.client.Connector; import org.apache.accumulo.core.client.IsolatedScanner; import org.apache.accumulo.core.client.IteratorSetting; @@ -74,6 +75,7 @@ import org.apache.accumulo.examples.simple.constraints.AlphaNumKeyConstraint; import org.apache.accumulo.minicluster.MiniAccumuloCluster; import org.apache.accumulo.minicluster.MiniAccumuloConfig; import org.apache.accumulo.test.functional.BadIterator; +import org.apache.accumulo.test.functional.SlowIterator; import org.apache.hadoop.io.Text; import org.junit.AfterClass; import org.junit.Assert; @@ -107,7 +109,7 @@ public class ConditionalWriterTest { conn.tableOperations().create("foo"); - ConditionalWriter cw = conn.createConditionalWriter("foo", Authorizations.EMPTY); + ConditionalWriter cw = conn.createConditionalWriter("foo", new ConditionalWriterConfig()); // mutation conditional on column tx:seq not exiting ConditionalMutation cm0 = new ConditionalMutation("99006", new Condition("tx", "seq")); @@ -190,7 +192,7 @@ public class ConditionalWriterTest { conn.securityOperations().changeUserAuthorizations("root", auths); - ConditionalWriter cw = conn.createConditionalWriter(table, auths); + ConditionalWriter cw = conn.createConditionalWriter(table, new ConditionalWriterConfig().setAuthorizations(auths)); ColumnVisibility cva = new ColumnVisibility("A"); ColumnVisibility cvb = new ColumnVisibility("B"); @@ -278,7 +280,7 @@ public class ConditionalWriterTest { Authorizations filteredAuths = new Authorizations("A"); - ConditionalWriter cw = conn.createConditionalWriter(table, filteredAuths); + ConditionalWriter cw = conn.createConditionalWriter(table, new ConditionalWriterConfig().setAuthorizations(filteredAuths)); ColumnVisibility cva = new ColumnVisibility("A"); ColumnVisibility cvb = new ColumnVisibility("B"); @@ -340,6 +342,25 @@ public class ConditionalWriterTest { Assert.assertEquals(Status.INVISIBLE_VISIBILITY, cw.write(cm7).getStatus()); cw.close(); + + // test passing auths that exceed users configured auths + + Authorizations exceedingAuths = new Authorizations("A", "B", "D"); + ConditionalWriter cw2 = conn.createConditionalWriter(table, new ConditionalWriterConfig().setAuthorizations(exceedingAuths)); + + ConditionalMutation cm8 = new ConditionalMutation("99006", new Condition("tx", "seq").setVisibility(cvb), new Condition("tx", "seq").setVisibility(cva) + .setValue("1")); + cm8.put("name", "last", cva, "doe"); + cm8.put("name", "first", cva, "john"); + cm8.put("tx", "seq", cva, "1"); + + try { + cw2.write(cm8).getStatus(); + Assert.assertTrue(false); + } catch (AccumuloSecurityException ase) {} + + + cw2.close(); } @Test @@ -356,7 +377,7 @@ public class ConditionalWriterTest { Scanner scanner = conn.createScanner(table + "_clone", new Authorizations()); - ConditionalWriter cw = conn.createConditionalWriter(table + "_clone", new Authorizations()); + ConditionalWriter cw = conn.createConditionalWriter(table + "_clone", new ConditionalWriterConfig()); ConditionalMutation cm0 = new ConditionalMutation("99006+", new Condition("tx", "seq")); cm0.put("tx", "seq", "1"); @@ -421,7 +442,7 @@ public class ConditionalWriterTest { Assert.assertEquals("3", scanner.iterator().next().getValue().toString()); - ConditionalWriter cw = conn.createConditionalWriter(table, new Authorizations()); + ConditionalWriter cw = conn.createConditionalWriter(table, new ConditionalWriterConfig()); ConditionalMutation cm0 = new ConditionalMutation("ACCUMULO-1000", new Condition("count", "comments").setValue("3")); cm0.put("count", "comments", "1"); @@ -504,7 +525,7 @@ public class ConditionalWriterTest { cm2.put("tx", "seq", cvab, "1"); mutations.add(cm2); - ConditionalWriter cw = conn.createConditionalWriter(table, new Authorizations("A")); + ConditionalWriter cw = conn.createConditionalWriter(table, new ConditionalWriterConfig().setAuthorizations(new Authorizations("A"))); Iterator<Result> results = cw.write(mutations.iterator()); int count = 0; while (results.hasNext()) { @@ -611,7 +632,7 @@ public class ConditionalWriterTest { cml.add(cm); } - ConditionalWriter cw = conn.createConditionalWriter(table, Authorizations.EMPTY); + ConditionalWriter cw = conn.createConditionalWriter(table, new ConditionalWriterConfig()); Iterator<Result> results = cw.write(cml.iterator()); @@ -704,7 +725,7 @@ public class ConditionalWriterTest { cm3.put("tx", "seq", cvaob, "2"); mutations.add(cm3); - ConditionalWriter cw = conn.createConditionalWriter(table, new Authorizations("A")); + ConditionalWriter cw = conn.createConditionalWriter(table, new ConditionalWriterConfig().setAuthorizations(new Authorizations("A"))); Iterator<Result> results = cw.write(mutations.iterator()); HashSet<String> rows = new HashSet<String>(); while (results.hasNext()) { @@ -745,7 +766,7 @@ public class ConditionalWriterTest { conn.tableOperations().create(table); - ConditionalWriter cw = conn.createConditionalWriter(table, Authorizations.EMPTY); + ConditionalWriter cw = conn.createConditionalWriter(table, new ConditionalWriterConfig()); ConditionalMutation cm1 = new ConditionalMutation("r1", new Condition("tx", "seq")); cm1.put("tx", "seq", "1"); @@ -942,7 +963,7 @@ public class ConditionalWriterTest { break; } - ConditionalWriter cw = conn.createConditionalWriter(table, Authorizations.EMPTY); + ConditionalWriter cw = conn.createConditionalWriter(table, new ConditionalWriterConfig()); ArrayList<ByteSequence> rows = new ArrayList<ByteSequence>(); @@ -1026,9 +1047,9 @@ public class ConditionalWriterTest { cm1.put("tx", "seq", "1"); cm1.put("data", "x", "a"); - ConditionalWriter cw1 = conn2.createConditionalWriter("sect1", Authorizations.EMPTY); - ConditionalWriter cw2 = conn2.createConditionalWriter("sect2", Authorizations.EMPTY); - ConditionalWriter cw3 = conn2.createConditionalWriter("sect3", Authorizations.EMPTY); + ConditionalWriter cw1 = conn2.createConditionalWriter("sect1", new ConditionalWriterConfig()); + ConditionalWriter cw2 = conn2.createConditionalWriter("sect2", new ConditionalWriterConfig()); + ConditionalWriter cw3 = conn2.createConditionalWriter("sect3", new ConditionalWriterConfig()); Assert.assertEquals(Status.ACCEPTED, cw3.write(cm1).getStatus()); @@ -1050,8 +1071,54 @@ public class ConditionalWriterTest { @Test - public void testTimeout() { - // TODO + public void testTimeout() throws Exception { + ZooKeeperInstance zki = new ZooKeeperInstance(cluster.getInstanceName(), cluster.getZooKeepers()); + Connector conn = zki.getConnector("root", new PasswordToken(secret)); + + String table = "fooT"; + + conn.tableOperations().create(table); + + ConditionalWriter cw = conn.createConditionalWriter(table, new ConditionalWriterConfig().setTimeout(1, TimeUnit.SECONDS)); + + ConditionalMutation cm1 = new ConditionalMutation("r1", new Condition("tx", "seq")); + cm1.put("tx", "seq", "1"); + cm1.put("data", "x", "a"); + + Assert.assertEquals(cw.write(cm1).getStatus(), Status.ACCEPTED); + + IteratorSetting is = new IteratorSetting(5, SlowIterator.class); + SlowIterator.setSeekSleepTime(is, 4000); + + ConditionalMutation cm2 = new ConditionalMutation("r1", new Condition("tx", "seq").setValue("1").setIterators(is)); + cm2.put("tx", "seq", "2"); + cm2.put("data", "x", "b"); + + Assert.assertEquals(cw.write(cm2).getStatus(), Status.UNKNOWN); + + Scanner scanner = conn.createScanner(table, Authorizations.EMPTY); + + for (Entry<Key,Value> entry : scanner) { + String cf = entry.getKey().getColumnFamilyData().toString(); + String cq = entry.getKey().getColumnQualifierData().toString(); + String val = entry.getValue().toString(); + + if (cf.equals("tx") && cq.equals("seq")) + Assert.assertEquals("1", val); + else if (cf.equals("data") && cq.equals("x")) + Assert.assertEquals("a", val); + else + Assert.assertTrue(false); + } + + ConditionalMutation cm3 = new ConditionalMutation("r1", new Condition("tx", "seq").setValue("1")); + cm3.put("tx", "seq", "2"); + cm3.put("data", "x", "b"); + + Assert.assertEquals(cw.write(cm3).getStatus(), Status.ACCEPTED); + + cw.close(); + } @Test @@ -1062,13 +1129,13 @@ public class ConditionalWriterTest { Connector conn = zki.getConnector("root", new PasswordToken(secret)); try { - conn.createConditionalWriter(table, Authorizations.EMPTY); + conn.createConditionalWriter(table, new ConditionalWriterConfig()); Assert.assertFalse(true); } catch (TableNotFoundException e) {} conn.tableOperations().create(table); - ConditionalWriter cw = conn.createConditionalWriter(table, Authorizations.EMPTY); + ConditionalWriter cw = conn.createConditionalWriter(table, new ConditionalWriterConfig()); conn.tableOperations().delete(table); @@ -1081,8 +1148,8 @@ public class ConditionalWriterTest { try { result.getStatus(); Assert.assertFalse(true); - } catch (TableDeletedException ae) { - + } catch (AccumuloException ae) { + Assert.assertEquals(TableDeletedException.class, ae.getCause().getClass()); } } @@ -1096,7 +1163,7 @@ public class ConditionalWriterTest { conn.tableOperations().create(table); - ConditionalWriter cw = conn.createConditionalWriter(table, Authorizations.EMPTY); + ConditionalWriter cw = conn.createConditionalWriter(table, new ConditionalWriterConfig()); conn.tableOperations().offline(table); @@ -1111,14 +1178,14 @@ public class ConditionalWriterTest { try { result.getStatus(); Assert.assertFalse(true); - } catch (TableOfflineException ae) { - + } catch (AccumuloException ae) { + Assert.assertEquals(TableOfflineException.class, ae.getCause().getClass()); } cw.close(); try { - conn.createConditionalWriter(table, Authorizations.EMPTY); + conn.createConditionalWriter(table, new ConditionalWriterConfig()); Assert.assertFalse(true); } catch (TableOfflineException e) {} } @@ -1140,7 +1207,7 @@ public class ConditionalWriterTest { conn.tableOperations().create(table); - ConditionalWriter cw = conn.createConditionalWriter(table, Authorizations.EMPTY); + ConditionalWriter cw = conn.createConditionalWriter(table, new ConditionalWriterConfig()); IteratorSetting iterSetting = new IteratorSetting(5, BadIterator.class);