Dear All
When a query with an ORDER BY is cancelled, the component
Arrays.sort() that sorts the chunk(s) of the result
bindings runs to completion before the cancel finishes.
[See QueryIterSort and SortedDataBag.]
For a large result set, this results in a long wait
before the cancelled request finally finishes. This
can be inconvenient.
The cancel request can be sneaked into the sort by
way of the comparator [1] and adding an instance
variable `cancelled` to SortedDataBag, set `true`
from QueryIterSort.requestCancel(). The comparator
checks `cancelled` and if it has become `true`
throws an exception, which is then caught outside
the call to Arrays.sort(), abandoning the sort.
See attached diff.
Questions arising:
* is it safe to abandon a sort from inside a comparator?
(can't see anything that suggests otherwise.)
* are there threading issues that have to be deal with
other than by making the `cancelled` flag volatile?
If what I suggest appears to be sane I'll make it a
pull request and run the process.
Chris
[1] Using a wrapper to handle the test for cancellation
and then delegating `compare` to the comparator
supplied to SortedDataBag.
diff --git a/jena-arq/src/main/java/org/apache/jena/atlas/data/SortedDataBag.java b/jena-arq/src/main/java/org/apache/jena/atlas/data/SortedDataBag.java
index 3deacce..0102935 100644
--- a/jena-arq/src/main/java/org/apache/jena/atlas/data/SortedDataBag.java
+++ b/jena-arq/src/main/java/org/apache/jena/atlas/data/SortedDataBag.java
@@ -69,19 +69,81 @@ public class SortedDataBag<E> extends AbstractDataBag<E>
protected final ThresholdPolicy<E> policy;
protected final SerializationFactory<E> serializationFactory;
- protected final Comparator<? super E> comparator;
+ protected final CanAbortComparator comparator;
protected boolean finishedAdding = false;
protected boolean spilled = false;
protected boolean closed = false;
+ protected volatile boolean cancelled;
public SortedDataBag(ThresholdPolicy<E> policy, SerializationFactory<E> serializerFactory, Comparator<? super E> comparator)
{
this.policy = policy;
this.serializationFactory = serializerFactory;
- this.comparator = comparator;
+ this.comparator = new CanAbortComparator(comparator);
}
+ private final class CanAbortComparator implements Comparator<E>
+ {
+ /**
+ The test for whether the sort has been cancelled is
+ performed every <code>cancelTestFrequency</code> comparisons.
+ This reduces the (presumed) overhead of access to a
+ volatile boolean.
+ */
+ static final int cancelTestFrequency = 10000;
+
+ /**
+ Count of the number of times this comparator has been called.
+ */
+ int count = 0;
+
+ final Comparator<? super E> baseComparator;
+
+ public CanAbortComparator(Comparator<? super E> comparator)
+ {
+ this.baseComparator = comparator;
+ }
+
+ @Override public int compare(E o1, E o2)
+ {
+ count += 1;
+ if (count % cancelTestFrequency == 0)
+ {
+ if (cancelled) throw new AbandonSort();
+ }
+ return baseComparator.compare(o1, o2);
+ }
+
+ /**
+ Sort the array <code>e</code> using this comparator
+ with the additional ability to abort the sort.
+ */
+ public boolean abortableSort(E[] e) {
+ try { Arrays.sort(e, this); }
+ catch (AbandonSort s) { return true; }
+ return false;
+ }
+ }
+
+ /**
+ <code>AbandonSort</code> is the exception thrown from
+ <code>CanAbortComparator</code> to abandon a sort.
+ */
+ public static class AbandonSort extends RuntimeException
+ {
+ private static final long serialVersionUID = 1L;
+ }
+
+ /**
+ cancel arranges that further comparisons using the supplied
+ comparator will abandon the sort in progress.
+ */
+ public void cancel()
+ {
+ cancelled = true;
+ }
+
protected void checkClosed()
{
if (closed) throw new AtlasException("SortedDataBag is closed, no operations can be performed on it.") ;
@@ -118,7 +180,7 @@ public class SortedDataBag<E> extends AbstractDataBag<E>
}
}
- @SuppressWarnings({ "unchecked", "rawtypes" })
+ @SuppressWarnings({ "unchecked" })
protected void spill()
{
// Make sure we have something to spill.
@@ -134,25 +196,26 @@ public class SortedDataBag<E> extends AbstractDataBag<E>
throw new AtlasException(e);
}
- // Sort the tuples
- // Collections.sort() will copy to an array, sort, and then copy back. Avoid that
- // extra copy by copying to an array and using Arrays.sort(). Also it lets us use
- // Collection<E> instead of List<E> as the type for the memory object. Unfortunately
- // because of Java's crazy generics we have to do it as an Object array.
- Object[] array = memory.toArray();
- Arrays.sort(array, (Comparator)comparator);
+ // Sort the tuples as an array. The CanAbortComparator will sort that
+ // array using Arrays.sort. The cast to E[] is safe. If the sort is
+ // aborted, don't bother messing around with the serialisation. We'll
+ // never get around to using it anyway.
- Sink<E> serializer = serializationFactory.createSerializer(out);
- try
+ E[] array = (E[]) memory.toArray();
+ if (!comparator.abortableSort(array))
{
- for (Object tuple : array)
- {
- serializer.send((E)tuple);
- }
- }
- finally
- {
- serializer.close();
+ Sink<E> serializer = serializationFactory.createSerializer(out);
+ try
+ {
+ for (Object tuple : array)
+ {
+ serializer.send((E)tuple);
+ }
+ }
+ finally
+ {
+ serializer.close();
+ }
}
spilled = true;
@@ -189,7 +252,7 @@ public class SortedDataBag<E> extends AbstractDataBag<E>
return iterator(getSpillFiles().size());
}
- @SuppressWarnings({ "unchecked", "rawtypes" })
+ @SuppressWarnings({ "unchecked" })
private Iterator<E> iterator(int size)
{
checkClosed();
@@ -200,9 +263,13 @@ public class SortedDataBag<E> extends AbstractDataBag<E>
if (!finishedAdding && memSize > 1)
{
// Again, some ugliness for speed
- Object[] array = memory.toArray();
- Arrays.sort(array, (Comparator)comparator);
- memory = Arrays.asList((E[])array);
+ E[] array = (E[]) memory.toArray();
+ if (comparator.abortableSort(array))
+ {
+ // if we comment this back in, we lose the timeout message!
+ // return Iter.nullIterator();
+ }
+ memory = Arrays.asList(array);
}
finishedAdding = true;
diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/engine/iterator/QueryIterSort.java b/jena-arq/src/main/java/org/apache/jena/sparql/engine/iterator/QueryIterSort.java
index ecba3a9..2593f09 100644
--- a/jena-arq/src/main/java/org/apache/jena/sparql/engine/iterator/QueryIterSort.java
+++ b/jena-arq/src/main/java/org/apache/jena/sparql/engine/iterator/QueryIterSort.java
@@ -67,6 +67,8 @@ public class QueryIterSort extends QueryIterPlainWrapper
@Override
public void requestCancel()
{
+ System.err.println(">> QueryIterSort.requestCancel()");
+ this.db.cancel();
this.embeddedIterator.cancel() ;
super.requestCancel() ;
}
diff --git a/jena-fuseki2/jena-fuseki-core/src/main/java/org/apache/jena/fuseki/cmd/FusekiCmd.java b/jena-fuseki2/jena-fuseki-core/src/main/java/org/apache/jena/fuseki/cmd/FusekiCmd.java
index 4fa1249..60251db 100644
--- a/jena-fuseki2/jena-fuseki-core/src/main/java/org/apache/jena/fuseki/cmd/FusekiCmd.java
+++ b/jena-fuseki2/jena-fuseki-core/src/main/java/org/apache/jena/fuseki/cmd/FusekiCmd.java
@@ -64,7 +64,8 @@ public class FusekiCmd {
}
static public void main(String... argv) {
- FusekiCmdInner.innerMain(argv);
+ String [] args = new String[] {"--config=/home/chris/Fuseki/apache-jena-fuseki-2.4.0/hacked-config.ttl"};
+ FusekiCmdInner.innerMain(args);
}
static class FusekiCmdInner extends CmdARQ {