Dear All

When a query with an ORDER BY is cancelled, the component
Arrays.sort() that sorts the chunk(s) of the result
bindings runs to completion before the cancel finishes.
[See QueryIterSort and SortedDataBag.]

For a large result set, this results in a long wait
before the cancelled request finally finishes. This
can be inconvenient.

The cancel request can be sneaked into the sort by
way of the comparator [1] and adding an instance
variable `cancelled` to SortedDataBag, set `true`
from QueryIterSort.requestCancel(). The comparator
checks `cancelled` and if it has become `true`
throws an exception, which is then caught outside
the call to Arrays.sort(), abandoning the sort.
See attached diff.

Questions arising:

* is it safe to abandon a sort from inside a comparator?
  (can't see anything that suggests otherwise.)

* are there threading issues that have to be deal with
  other than by making the `cancelled` flag volatile?

If what I suggest appears to be sane I'll make it a
 pull request and run the process.

Chris

[1] Using a wrapper to handle the test for cancellation
    and then delegating `compare` to the comparator
    supplied to SortedDataBag.





diff --git a/jena-arq/src/main/java/org/apache/jena/atlas/data/SortedDataBag.java b/jena-arq/src/main/java/org/apache/jena/atlas/data/SortedDataBag.java
index 3deacce..0102935 100644
--- a/jena-arq/src/main/java/org/apache/jena/atlas/data/SortedDataBag.java
+++ b/jena-arq/src/main/java/org/apache/jena/atlas/data/SortedDataBag.java
@@ -69,19 +69,81 @@ public class SortedDataBag<E> extends AbstractDataBag<E>
     
     protected final ThresholdPolicy<E> policy;
     protected final SerializationFactory<E> serializationFactory;
-    protected final Comparator<? super E> comparator;
+    protected final CanAbortComparator comparator;
     
     protected boolean finishedAdding = false;
     protected boolean spilled = false;
     protected boolean closed = false;
+    protected volatile boolean cancelled;
     
     public SortedDataBag(ThresholdPolicy<E> policy, SerializationFactory<E> serializerFactory, Comparator<? super E> comparator)
     {
         this.policy = policy;
         this.serializationFactory = serializerFactory;
-        this.comparator = comparator;
+        this.comparator = new CanAbortComparator(comparator);
     }
     
+    private final class CanAbortComparator implements Comparator<E> 
+    	{
+    	/**
+    	    The test for whether the sort has been cancelled is
+    	    performed every <code>cancelTestFrequency</code> comparisons.
+    	    This reduces the (presumed) overhead of access to a
+    	    volatile boolean.    	    
+    	*/
+    	static final int cancelTestFrequency = 10000;
+    	
+    	/**
+    	    Count of the number of times this comparator has been called.
+    	*/
+		int count = 0;
+		
+		final Comparator<? super E> baseComparator;
+		
+		public CanAbortComparator(Comparator<? super E> comparator) 
+			{
+			this.baseComparator = comparator;
+			}
+
+		@Override public int compare(E o1, E o2) 
+		{	
+			count += 1;
+			if (count % cancelTestFrequency == 0) 
+			{
+				if (cancelled) throw new AbandonSort();
+			}
+			return baseComparator.compare(o1, o2);
+		}
+		
+		/**
+		    Sort the array <code>e</code> using this comparator
+		 	with the additional ability to abort the sort.
+		*/
+		public boolean abortableSort(E[] e) {
+			try { Arrays.sort(e, this); }
+			catch (AbandonSort s) { return true; }
+			return false;
+		}
+	}
+
+    /**
+        <code>AbandonSort</code> is the exception thrown from
+        <code>CanAbortComparator</code> to abandon a sort.
+    */
+	public static class AbandonSort extends RuntimeException 
+    {
+		private static final long serialVersionUID = 1L;
+    }
+    
+	/**
+	    cancel arranges that further comparisons using the supplied
+	    comparator will abandon the sort in progress.
+	*/
+	public void cancel() 
+	{
+		cancelled = true;
+	}
+    
     protected void checkClosed()
     {
         if (closed) throw new AtlasException("SortedDataBag is closed, no operations can be performed on it.") ;
@@ -118,7 +180,7 @@ public class SortedDataBag<E> extends AbstractDataBag<E>
         }
     }
     
-    @SuppressWarnings({ "unchecked", "rawtypes" })
+    @SuppressWarnings({ "unchecked" })
     protected void spill()
     {
         // Make sure we have something to spill.
@@ -134,25 +196,26 @@ public class SortedDataBag<E> extends AbstractDataBag<E>
                 throw new AtlasException(e);
             }
             
-            // Sort the tuples
-            // Collections.sort() will copy to an array, sort, and then copy back.  Avoid that
-            // extra copy by copying to an array and using Arrays.sort().  Also it lets us use
-            // Collection<E> instead of List<E> as the type for the memory object.  Unfortunately
-            // because of Java's crazy generics we have to do it as an Object array.
-            Object[] array = memory.toArray();
-            Arrays.sort(array, (Comparator)comparator);
+            // Sort the tuples as an array. The CanAbortComparator will sort that
+            // array using Arrays.sort. The cast to E[] is safe. If the sort is
+            // aborted, don't bother messing around with the serialisation. We'll
+            // never get around to using it anyway.
             
-            Sink<E> serializer = serializationFactory.createSerializer(out);
-            try
+            E[] array = (E[]) memory.toArray();
+            if (!comparator.abortableSort(array)) 
             {
-                for (Object tuple : array)
-                {
-                    serializer.send((E)tuple);
-                }
-            }
-            finally
-            {
-                serializer.close();
+	            Sink<E> serializer = serializationFactory.createSerializer(out);
+	            try
+	            {
+	                for (Object tuple : array)
+	                {
+	                    serializer.send((E)tuple);
+	                }
+	            }
+	            finally
+	            {
+	                serializer.close();
+	            }
             }
             
             spilled = true;
@@ -189,7 +252,7 @@ public class SortedDataBag<E> extends AbstractDataBag<E>
         return iterator(getSpillFiles().size());
     }
 
-    @SuppressWarnings({ "unchecked", "rawtypes" })
+    @SuppressWarnings({ "unchecked" })
     private Iterator<E> iterator(int size)
     {
         checkClosed();
@@ -200,9 +263,13 @@ public class SortedDataBag<E> extends AbstractDataBag<E>
         if (!finishedAdding && memSize > 1)
         {
             // Again, some ugliness for speed
-            Object[] array = memory.toArray();
-            Arrays.sort(array, (Comparator)comparator);
-            memory = Arrays.asList((E[])array);
+            E[] array = (E[]) memory.toArray();
+            if (comparator.abortableSort(array)) 
+            {
+            	// if we comment this back in, we lose the timeout message!
+            	// return Iter.nullIterator();
+            }
+            memory = Arrays.asList(array);
         }
         
         finishedAdding = true;
diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/engine/iterator/QueryIterSort.java b/jena-arq/src/main/java/org/apache/jena/sparql/engine/iterator/QueryIterSort.java
index ecba3a9..2593f09 100644
--- a/jena-arq/src/main/java/org/apache/jena/sparql/engine/iterator/QueryIterSort.java
+++ b/jena-arq/src/main/java/org/apache/jena/sparql/engine/iterator/QueryIterSort.java
@@ -67,6 +67,8 @@ public class QueryIterSort extends QueryIterPlainWrapper
     @Override
     public void requestCancel()
     {
+    	System.err.println(">> QueryIterSort.requestCancel()");
+    	this.db.cancel();
         this.embeddedIterator.cancel() ;
         super.requestCancel() ;
     }
diff --git a/jena-fuseki2/jena-fuseki-core/src/main/java/org/apache/jena/fuseki/cmd/FusekiCmd.java b/jena-fuseki2/jena-fuseki-core/src/main/java/org/apache/jena/fuseki/cmd/FusekiCmd.java
index 4fa1249..60251db 100644
--- a/jena-fuseki2/jena-fuseki-core/src/main/java/org/apache/jena/fuseki/cmd/FusekiCmd.java
+++ b/jena-fuseki2/jena-fuseki-core/src/main/java/org/apache/jena/fuseki/cmd/FusekiCmd.java
@@ -64,7 +64,8 @@ public class FusekiCmd {
     }
 
     static public void main(String... argv) {
-        FusekiCmdInner.innerMain(argv);
+    	String [] args = new String[] {"--config=/home/chris/Fuseki/apache-jena-fuseki-2.4.0/hacked-config.ttl"};
+        FusekiCmdInner.innerMain(args);
     }
     
     static class FusekiCmdInner extends CmdARQ {

Reply via email to