Author: kturner
Date: Tue Sep 18 16:48:00 2012
New Revision: 1387249
URL: http://svn.apache.org/viewvc?rev=1387249&view=rev
Log:
ACCUMULO-705 Modfied API so that setTimeout() method is consistent across
Scanner, BatchScanner, and BatchWriter
Modified:
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/BatchScanner.java
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/IsolatedScanner.java
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/Scanner.java
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerImpl.java
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerOptions.java
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReader.java
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mock/MockBatchScanner.java
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mock/MockScanner.java
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/monitor/servlets/trace/NullScanner.java
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/functional/TimeoutTest.java
Modified:
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/BatchScanner.java
URL:
http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/BatchScanner.java?rev=1387249&r1=1387248&r2=1387249&view=diff
==============================================================================
---
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/BatchScanner.java
(original)
+++
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/BatchScanner.java
Tue Sep 18 16:48:00 2012
@@ -17,6 +17,7 @@
package org.apache.accumulo.core.client;
import java.util.Collection;
+import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.data.Range;
@@ -47,14 +48,16 @@ public interface BatchScanner extends Sc
/**
* Sets a timeout threshold for a server to respond. The batch scanner will
accomplish as much work as possible before throwing an exception. BatchScanner
- * iterators will throw a {@link TimedOutException} when all needed servers
timeout.
+ * iterators will throw a {@link TimedOutException} when all needed servers
timeout. Setting the timeout to zero or Long.MAX_VALUE and TimeUnit.MILLISECONDS
+ * means no timeout.
*
* <p>
- * If not set, the timeout defaults to MAX_INT
+ * If not set, there is not timeout. The BatchScanner will retry forever.
*
* @param timeout
- * in seconds
+ * @param timeUnit
+ * determines how timeout is interpreted
*/
@Override
- void setTimeOut(int timeout);
+ void setTimeout(long timeout, TimeUnit timeUnit);
}
Modified:
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java
URL:
http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java?rev=1387249&r1=1387248&r2=1387249&view=diff
==============================================================================
---
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java
(original)
+++
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java
Tue Sep 18 16:48:00 2012
@@ -24,6 +24,7 @@ import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.client.impl.ScannerOptions;
import org.apache.accumulo.core.client.mock.IteratorAdapter;
@@ -53,7 +54,6 @@ import org.apache.hadoop.io.Text;
*/
public class ClientSideIteratorScanner extends ScannerOptions implements
Scanner {
private int size;
- private int timeOut;
private Range range;
private boolean isolated = false;
@@ -136,7 +136,7 @@ public class ClientSideIteratorScanner e
smi = new ScannerTranslator(scanner);
this.range = scanner.getRange();
this.size = scanner.getBatchSize();
- this.timeOut = scanner.getTimeOut();
+ this.timeOut = scanner.getTimeout(TimeUnit.MILLISECONDS);
}
/**
@@ -151,7 +151,7 @@ public class ClientSideIteratorScanner e
@Override
public Iterator<Entry<Key,Value>> iterator() {
smi.scanner.setBatchSize(size);
- smi.scanner.setTimeOut(timeOut);
+ smi.scanner.setTimeout(timeOut, TimeUnit.MILLISECONDS);
if (isolated)
smi.scanner.enableIsolation();
else
@@ -208,13 +208,19 @@ public class ClientSideIteratorScanner e
}
@Override
- public void setTimeOut(final int timeOut) {
- this.timeOut = timeOut;
+ public void setTimeOut(int timeOut) {
+ if (timeOut == Integer.MAX_VALUE)
+ setTimeout(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+ else
+ setTimeout(timeOut, TimeUnit.SECONDS);
}
@Override
public int getTimeOut() {
- return timeOut;
+ long timeout = getTimeout(TimeUnit.SECONDS);
+ if (timeout >= Integer.MAX_VALUE)
+ return Integer.MAX_VALUE;
+ return (int) timeout;
}
@Override
Modified:
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/IsolatedScanner.java
URL:
http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/IsolatedScanner.java?rev=1387249&r1=1387248&r2=1387249&view=diff
==============================================================================
---
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/IsolatedScanner.java
(original)
+++
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/IsolatedScanner.java
Tue Sep 18 16:48:00 2012
@@ -19,6 +19,7 @@ package org.apache.accumulo.core.client;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.client.impl.IsolationException;
import org.apache.accumulo.core.client.impl.ScannerOptions;
@@ -45,11 +46,11 @@ public class IsolatedScanner extends Sca
private Entry<Key,Value> nextRowStart;
private Iterator<Entry<Key,Value>> rowIter;
private ByteSequence lastRow = null;
+ private long timeout;
private Scanner scanner;
private ScannerOptions opts;
private Range range;
- private int timeOut;
private int batchSize;
private void readRow() {
@@ -120,7 +121,7 @@ public class IsolatedScanner extends Sca
synchronized (scanner) {
scanner.enableIsolation();
scanner.setBatchSize(batchSize);
- scanner.setTimeOut(timeOut);
+ scanner.setTimeout(timeout, TimeUnit.MILLISECONDS);
scanner.setRange(r);
setOptions((ScannerOptions) scanner, opts);
@@ -129,11 +130,11 @@ public class IsolatedScanner extends Sca
}
}
- public RowBufferingIterator(Scanner scanner, ScannerOptions opts, Range
range, int timeOut, int batchSize, RowBufferFactory bufferFactory) {
+ public RowBufferingIterator(Scanner scanner, ScannerOptions opts, Range
range, long timeout, int batchSize, RowBufferFactory bufferFactory) {
this.scanner = scanner;
this.opts = new ScannerOptions(opts);
this.range = range;
- this.timeOut = timeOut;
+ this.timeout = timeout;
this.batchSize = batchSize;
buffer = bufferFactory.newBuffer();
@@ -208,7 +209,6 @@ public class IsolatedScanner extends Sca
private Scanner scanner;
private Range range;
- private int timeOut;
private int batchSize;
private RowBufferFactory bufferFactory;
@@ -219,7 +219,7 @@ public class IsolatedScanner extends Sca
public IsolatedScanner(Scanner scanner, RowBufferFactory bufferFactory) {
this.scanner = scanner;
this.range = scanner.getRange();
- this.timeOut = scanner.getTimeOut();
+ this.timeOut = scanner.getTimeout(TimeUnit.MILLISECONDS);
this.batchSize = scanner.getBatchSize();
this.bufferFactory = bufferFactory;
}
@@ -231,12 +231,18 @@ public class IsolatedScanner extends Sca
@Override
public void setTimeOut(int timeOut) {
- this.timeOut = timeOut;
+ if (timeOut == Integer.MAX_VALUE)
+ setTimeout(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+ else
+ setTimeout(timeOut, TimeUnit.SECONDS);
}
@Override
public int getTimeOut() {
- return timeOut;
+ long timeout = getTimeout(TimeUnit.SECONDS);
+ if (timeout >= Integer.MAX_VALUE)
+ return Integer.MAX_VALUE;
+ return (int) timeout;
}
@Override
Modified:
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/Scanner.java
URL:
http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/Scanner.java?rev=1387249&r1=1387248&r2=1387249&view=diff
==============================================================================
---
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/Scanner.java
(original)
+++
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/Scanner.java
Tue Sep 18 16:48:00 2012
@@ -28,6 +28,25 @@ import org.apache.accumulo.core.data.Ran
public interface Scanner extends ScannerBase {
/**
+ * This setting determines how long a scanner will automatically retry when
a failure occurs. By default a scanner will retry forever.
+ *
+ * @param timeOut
+ * in seconds
+ * @deprecated Since 1.5. See {@link ScannerBase#setTimeout(long,
java.util.concurrent.TimeUnit)}
+ */
+ @Deprecated
+ public void setTimeOut(int timeOut);
+
+ /**
+ * Returns the setting for how long a scanner will automatically retry when
a failure occurs.
+ *
+ * @return the timeout configured for this scanner
+ * @deprecated Since 1.5. See {@link
ScannerBase#getTimeout(java.util.concurrent.TimeUnit)}
+ */
+ @Deprecated
+ public int getTimeOut();
+
+ /**
* Sets the range of keys to scan over.
*
* @param range
Modified:
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java
URL:
http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java?rev=1387249&r1=1387248&r2=1387249&view=diff
==============================================================================
---
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java
(original)
+++
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/ScannerBase.java
Tue Sep 18 16:48:00 2012
@@ -18,6 +18,7 @@ package org.apache.accumulo.core.client;
import java.util.Iterator;
import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
@@ -104,15 +105,18 @@ public interface ScannerBase extends Ite
/**
* This setting determines how long a scanner will automatically retry when
a failure occurs. By default a scanner will retry forever.
*
+ * Setting to zero or Long.MAX_VALUE and TimeUnit.MILLISECONDS means to
retry forever.
+ *
* @param timeOut
- * in seconds
+ * @param timeUnit
+ * determines how timeout is interpreted
*/
- public void setTimeOut(int timeOut);
+ public void setTimeout(long timeOut, TimeUnit timeUnit);
/**
* Returns the setting for how long a scanner will automatically retry when
a failure occurs.
*
* @return the timeout configured for this scanner
*/
- public int getTimeOut();
+ public long getTimeout(TimeUnit timeUnit);
}
Modified:
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerImpl.java
URL:
http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerImpl.java?rev=1387249&r1=1387248&r2=1387249&view=diff
==============================================================================
---
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerImpl.java
(original)
+++
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerImpl.java
Tue Sep 18 16:48:00 2012
@@ -29,6 +29,7 @@ package org.apache.accumulo.core.client.
import java.util.Iterator;
import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.Instance;
@@ -54,7 +55,6 @@ public class ScannerImpl extends Scanner
private Text table;
private int size;
- private int timeOut;
private Range range;
private boolean isolated = false;
@@ -68,7 +68,6 @@ public class ScannerImpl extends Scanner
this.authorizations = authorizations;
this.size = Constants.SCAN_BATCH_SIZE;
- this.timeOut = Integer.MAX_VALUE;
}
@Override
@@ -100,7 +99,7 @@ public class ScannerImpl extends Scanner
* Scanner object will have no effect on existing iterators.
*/
public synchronized Iterator<Entry<Key,Value>> iterator() {
- return new ScannerIterator(instance, credentials, table, authorizations,
range, size, timeOut, this, isolated);
+ return new ScannerIterator(instance, credentials, table, authorizations,
range, size, getTimeOut(), this, isolated);
}
@Override
@@ -112,4 +111,20 @@ public class ScannerImpl extends Scanner
public synchronized void disableIsolation() {
this.isolated = false;
}
+
+ @Override
+ public void setTimeOut(int timeOut) {
+ if (timeOut == Integer.MAX_VALUE)
+ setTimeout(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+ else
+ setTimeout(timeOut, TimeUnit.SECONDS);
+ }
+
+ @Override
+ public int getTimeOut() {
+ long timeout = getTimeout(TimeUnit.SECONDS);
+ if (timeout >= Integer.MAX_VALUE)
+ return Integer.MAX_VALUE;
+ return (int) timeout;
+ }
}
Modified:
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerOptions.java
URL:
http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerOptions.java?rev=1387249&r1=1387248&r2=1387249&view=diff
==============================================================================
---
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerOptions.java
(original)
+++
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/ScannerOptions.java
Tue Sep 18 16:48:00 2012
@@ -26,6 +26,7 @@ import java.util.Map.Entry;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.ScannerBase;
@@ -44,7 +45,7 @@ public class ScannerOptions implements S
protected SortedSet<Column> fetchedColumns = new TreeSet<Column>();
- protected int timeOut = Integer.MAX_VALUE;
+ protected long timeOut = Long.MAX_VALUE;
private String regexIterName = null;
@@ -185,16 +186,19 @@ public class ScannerOptions implements S
}
@Override
- public void setTimeOut(int timeOut) {
+ public void setTimeout(long timeout, TimeUnit timeUnit) {
if (timeOut <= 0) {
throw new IllegalArgumentException("TimeOut must be positive : " +
timeOut);
}
- this.timeOut = timeOut;
+ if (timeout == 0)
+ this.timeOut = Long.MAX_VALUE;
+ else
+ this.timeOut = timeUnit.toMillis(timeout);
}
@Override
- public int getTimeOut() {
+ public long getTimeout(TimeUnit timeunit) {
return timeOut;
}
}
Modified:
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReader.java
URL:
http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReader.java?rev=1387249&r1=1387248&r2=1387249&view=diff
==============================================================================
---
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReader.java
(original)
+++
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReader.java
Tue Sep 18 16:48:00 2012
@@ -107,6 +107,6 @@ public class TabletServerBatchReader ext
throw new IllegalStateException("batch reader closed");
}
- return new TabletServerBatchReaderIterator(instance, credentials, table,
authorizations, ranges, numThreads, queryThreadPool, this, timeOut * 1000l);
+ return new TabletServerBatchReaderIterator(instance, credentials, table,
authorizations, ranges, numThreads, queryThreadPool, this, timeOut);
}
}
Modified:
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java
URL:
http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java?rev=1387249&r1=1387248&r2=1387249&view=diff
==============================================================================
---
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java
(original)
+++
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/impl/TabletServerBatchReaderIterator.java
Tue Sep 18 16:48:00 2012
@@ -626,7 +626,7 @@ public class TabletServerBatchReaderIter
TTransport transport = null;
try {
TabletClientService.Client client;
- if (timeoutTracker.getTimeOut() < Integer.MAX_VALUE * 1000l)
+ if (timeoutTracker.getTimeOut() < Long.MAX_VALUE)
client = ThriftUtil.getTServerClient(server, conf,
timeoutTracker.getTimeOut());
else
client = ThriftUtil.getTServerClient(server, conf);
Modified:
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mock/MockBatchScanner.java
URL:
http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mock/MockBatchScanner.java?rev=1387249&r1=1387248&r2=1387249&view=diff
==============================================================================
---
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mock/MockBatchScanner.java
(original)
+++
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mock/MockBatchScanner.java
Tue Sep 18 16:48:00 2012
@@ -91,12 +91,4 @@ public class MockBatchScanner extends Mo
@Override
public void close() {}
-
- @Override
- public void setTimeOut(int timeout) {}
-
- @Override
- public int getTimeOut() {
- return Integer.MAX_VALUE;
- }
}
Modified:
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mock/MockScanner.java
URL:
http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mock/MockScanner.java?rev=1387249&r1=1387248&r2=1387249&view=diff
==============================================================================
---
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mock/MockScanner.java
(original)
+++
accumulo/trunk/core/src/main/java/org/apache/accumulo/core/client/mock/MockScanner.java
Tue Sep 18 16:48:00 2012
@@ -19,6 +19,7 @@ package org.apache.accumulo.core.client.
import java.io.IOException;
import java.util.Iterator;
import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.data.Key;
@@ -36,17 +37,22 @@ public class MockScanner extends MockSca
MockScanner(MockTable table, Authorizations auths) {
super(table, auths);
- timeOut = 0;
}
@Override
public void setTimeOut(int timeOut) {
- this.timeOut = timeOut;
+ if (timeOut == Integer.MAX_VALUE)
+ setTimeout(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+ else
+ setTimeout(timeOut, TimeUnit.SECONDS);
}
@Override
public int getTimeOut() {
- return timeOut;
+ long timeout = getTimeout(TimeUnit.SECONDS);
+ if (timeout >= Integer.MAX_VALUE)
+ return Integer.MAX_VALUE;
+ return (int) timeout;
}
@Override
Modified:
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/monitor/servlets/trace/NullScanner.java
URL:
http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/monitor/servlets/trace/NullScanner.java?rev=1387249&r1=1387248&r2=1387249&view=diff
==============================================================================
---
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/monitor/servlets/trace/NullScanner.java
(original)
+++
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/monitor/servlets/trace/NullScanner.java
Tue Sep 18 16:48:00 2012
@@ -18,6 +18,7 @@ package org.apache.accumulo.server.monit
import java.util.Iterator;
import java.util.Map.Entry;
+import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.Scanner;
@@ -90,4 +91,12 @@ public class NullScanner implements Scan
@Override
public void removeScanIterator(String iteratorName) {}
+ @Override
+ public void setTimeout(long timeOut, TimeUnit timeUnit) {}
+
+ @Override
+ public long getTimeout(TimeUnit timeUnit) {
+ return 0;
+ }
+
}
Modified:
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/functional/TimeoutTest.java
URL:
http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/functional/TimeoutTest.java?rev=1387249&r1=1387248&r2=1387249&view=diff
==============================================================================
---
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/functional/TimeoutTest.java
(original)
+++
accumulo/trunk/server/src/main/java/org/apache/accumulo/server/test/functional/TimeoutTest.java
Tue Sep 18 16:48:00 2012
@@ -99,7 +99,7 @@ public class TimeoutTest extends Functio
bw.close();
BatchScanner bs = getConnector().createBatchScanner("timeout",
Constants.NO_AUTHS, 2);
- bs.setTimeOut(1);
+ bs.setTimeout(1, TimeUnit.SECONDS);
bs.setRanges(Collections.singletonList(new Range()));
// should not timeout