Author: andy
Date: Thu Apr 18 21:01:18 2013
New Revision: 1469571

URL: http://svn.apache.org/r1469571
Log:
Cope with timeout1 happenign just after the first row is seen but not returned.
Sync between abort and timeout2 setting via private lock object.
Separate out the wrapper iterator because it's becoming more complicated.

Modified:
    
jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/sparql/engine/QueryExecutionBase.java

Modified: 
jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/sparql/engine/QueryExecutionBase.java
URL: 
http://svn.apache.org/viewvc/jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/sparql/engine/QueryExecutionBase.java?rev=1469571&r1=1469570&r2=1469571&view=diff
==============================================================================
--- 
jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/sparql/engine/QueryExecutionBase.java
 (original)
+++ 
jena/trunk/jena-arq/src/main/java/com/hp/hpl/jena/sparql/engine/QueryExecutionBase.java
 Thu Apr 18 21:01:18 2013
@@ -69,6 +69,7 @@ public class QueryExecutionBase implemen
     private Context            context ;
     private FileManager        fileManager = FileManager.get() ;
     private QuerySolution      initialBinding = null ;      
+    private Object             lockTimeout = new Object() ;     // 
syncrhonization.  
 
     // has cancel() been called?
     private volatile boolean   cancel = false ;
@@ -151,16 +152,6 @@ public class QueryExecutionBase implemen
         }
     }
     
-    // Old, synchronous code.
-    // Delete when we are sure cancellation is stable.
-//    public void abort()
-//    {
-//        abort = true ;
-//        if ( queryIterator != null )
-//            queryIterator.abort() ;
-//        cancel = true ;
-//    }
-
     @Override
     public void close()
     {
@@ -172,18 +163,19 @@ public class QueryExecutionBase implemen
     }
 
     @Override
-    public synchronized void abort()
+    public void abort()
        {
-           // This is called asynchronously to the execution.
-        // synchronized is for coordination with other calls of .abort.
-               if ( queryIterator != null ) 
-               {
-                       // we cancel the chain of iterators, however, we do 
*not* close the iterators. 
-                       // That happens after the cancellation is properly over.
-                   queryIterator.cancel() ;
-                       cancel = true ;
-               }
-        cancel = true ;
+        synchronized(lockTimeout)
+        {
+            // This is called asynchronously to the execution.
+            // synchronized is for coordination with other calls of
+            // .abort and with the timeout2 reset code. 
+            if ( queryIterator != null ) 
+                // we notify the chain of iterators, however, we do *not* 
close the iterators. 
+                // That happens after the cancellation is properly over.
+                queryIterator.cancel() ;
+            cancel = true ;
+        }
        }
     
     @Override
@@ -419,32 +411,41 @@ public class QueryExecutionBase implemen
         // Second timeout done by wrapping the iterator.
     }
     
-    private QueryIterator initTimeout2(QueryIterator queryIterator)
+    private class QueryIteratorTimer2 extends QueryIteratorWrapper
     {
-        if ( timeout2 < 0 && timeout2 != TIMEOUT_INF )
-            return queryIterator ;
-        // Wrap with a resetter.
-        return new QueryIteratorWrapper(queryIterator)
+        public QueryIteratorTimer2(QueryIterator qIter)
         {
-            boolean resetDone = false ;
-            @Override
-            protected Binding moveToNextBinding()
-            { 
-                Binding b = super.moveToNextBinding() ;
-                //System.out.println(b) ;
-                if ( ! resetDone )
+            super(qIter) ;
+        }
+        
+        long yieldCount = 0 ;
+        boolean resetDone = false ;
+        @Override
+        protected Binding moveToNextBinding()
+        { 
+            Binding b = super.moveToNextBinding() ;
+            yieldCount++ ;
+            if ( ! resetDone )
+            {
+                // Synchronize with abort.
+                synchronized(lockTimeout)
                 {
-                    //System.out.printf("Reset timer: ==> %d\n", timeout2) ;
+                    if ( cancel )
+                        // timeout1 went off after the binding was yielded but 
+                        // before we got here.
+                        throw new QueryCancelledException() ;
+                    
+                    // Cancel timeout1?
                     if ( pingback == null )
                     {
                         if ( timeout2 > 0 )
-                            // No first timeout - finite second timeout. 
+                            // Not first timeout - finite second timeout. 
                             pingback = alarmClock.add(callback, 
QueryExecutionBase.this, timeout2) ;
                     }
                     else
                     {
                         // We have moved for the first time.
-                        // Reset the timer if finite timeout else cancel.
+                        // Reset the timer if finite timeout2 else cancel.
                         if ( timeout2 < 0 )
                             alarmClock.cancel(pingback) ;
                         else
@@ -452,9 +453,17 @@ public class QueryExecutionBase implemen
                     }
                     resetDone = true ;
                 }
-                return b ;
             }
-        };
+            return b ;
+        }
+    }
+    
+    private QueryIterator initTimeout2(QueryIterator queryIterator)
+    {
+        if ( timeout2 < 0 || timeout2 == TIMEOUT_INF )
+            return queryIterator ;
+        // Wrap with a resetter.
+        return new QueryIteratorTimer2(queryIterator) ;
     }
     
     private void cancelPingback()
@@ -463,8 +472,7 @@ public class QueryExecutionBase implemen
             alarmClock.cancel(pingback) ;
     }
     
-    protected final void execInit()
-    { }
+    protected void execInit() { }
 
     private ResultSet asResultSet(QueryIterator qIter)
     {


Reply via email to