This is an automated email from the ASF dual-hosted git repository.

andy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/jena.git

commit 711fe075a01adc1ab152c86cb36a68318fb8bca7
Author: Andy Seaborne <[email protected]>
AuthorDate: Thu May 30 14:44:44 2024 +0100

    GH-2506: Intercept and check HttpAction transaction lifecycle
---
 .../org/apache/jena/sparql/core/Transactional.java |   3 +-
 .../org/apache/jena/fuseki/server/DataService.java |   8 +-
 .../jena/fuseki/server/OperationRegistry.java      |   3 +-
 .../org/apache/jena/fuseki/servlets/GSP_RW.java    |   6 +-
 .../apache/jena/fuseki/servlets/HttpAction.java    | 223 +++++++++++++++++----
 .../PatchApply.java}                               |  12 +-
 .../apache/jena/fuseki/servlets/SPARQL_Update.java |   2 +-
 .../org/apache/jena/fuseki/servlets/UploadRDF.java |   4 +-
 .../fuseki/main/TestFusekiCustomOperation.java     |   2 +-
 .../jena/rdfpatch/system/DatasetGraphChanges.java  |   6 +-
 10 files changed, 203 insertions(+), 66 deletions(-)

diff --git 
a/jena-arq/src/main/java/org/apache/jena/sparql/core/Transactional.java 
b/jena-arq/src/main/java/org/apache/jena/sparql/core/Transactional.java
index 19e9c94b6d..c6c1370522 100644
--- a/jena-arq/src/main/java/org/apache/jena/sparql/core/Transactional.java
+++ b/jena-arq/src/main/java/org/apache/jena/sparql/core/Transactional.java
@@ -190,7 +190,8 @@ public interface Transactional
     /** Finish the transaction - if a write transaction and commit() has not 
been called, then abort */
     public void end() ;
 
-    /** Return the current mode of the transaction - "read" or "write".
+    /**
+     * Return the current mode of the transaction - "read" or "write".
      * If the caller is not in a transaction, this method returns null.
      */
     public ReadWrite transactionMode();
diff --git 
a/jena-fuseki2/jena-fuseki-core/src/main/java/org/apache/jena/fuseki/server/DataService.java
 
b/jena-fuseki2/jena-fuseki-core/src/main/java/org/apache/jena/fuseki/server/DataService.java
index 3ecb06aba1..7d823180b6 100644
--- 
a/jena-fuseki2/jena-fuseki-core/src/main/java/org/apache/jena/fuseki/server/DataService.java
+++ 
b/jena-fuseki2/jena-fuseki-core/src/main/java/org/apache/jena/fuseki/server/DataService.java
@@ -239,6 +239,10 @@ public class DataService {
         totalTxn.getAndIncrement();
     }
 
+    public void finishTxn() {
+        activeTxn.decrementAndGet();
+    }
+
     private void check(DataServiceStatus status) {
         if ( state != status ) {
             String msg = format("DataService %s: Expected=%s, Actual=%s", 
label(), status, state);
@@ -246,10 +250,6 @@ public class DataService {
         }
     }
 
-    public void finishTxn() {
-        activeTxn.decrementAndGet();
-    }
-
     /** Shutdown and never use again. */
     public synchronized void shutdown() {
         if ( state == CLOSING )
diff --git 
a/jena-fuseki2/jena-fuseki-core/src/main/java/org/apache/jena/fuseki/server/OperationRegistry.java
 
b/jena-fuseki2/jena-fuseki-core/src/main/java/org/apache/jena/fuseki/server/OperationRegistry.java
index bbfb61377a..7038c82f78 100644
--- 
a/jena-fuseki2/jena-fuseki-core/src/main/java/org/apache/jena/fuseki/server/OperationRegistry.java
+++ 
b/jena-fuseki2/jena-fuseki-core/src/main/java/org/apache/jena/fuseki/server/OperationRegistry.java
@@ -26,7 +26,6 @@ import jakarta.servlet.ServletContext;
 
 import org.apache.jena.atlas.logging.Log;
 import org.apache.jena.fuseki.Fuseki;
-import org.apache.jena.fuseki.patch.PatchApplyService;
 import org.apache.jena.fuseki.servlets.*;
 import org.apache.jena.riot.WebContent;
 
@@ -47,7 +46,7 @@ public class OperationRegistry {
     private static final ActionService uploadServlet   = new UploadRDF();
     private static final ActionService gspServlet_R    = new GSP_R();
     private static final ActionService gspServlet_RW   = new GSP_RW();
-    private static final ActionService rdfPatch        = new 
PatchApplyService();
+    private static final ActionService rdfPatch        = new PatchApply();
     private static final ActionService noOperation     = new 
NoOpActionService();
     private static final ActionService shaclValidation = new 
SHACL_Validation();
 
diff --git 
a/jena-fuseki2/jena-fuseki-core/src/main/java/org/apache/jena/fuseki/servlets/GSP_RW.java
 
b/jena-fuseki2/jena-fuseki-core/src/main/java/org/apache/jena/fuseki/servlets/GSP_RW.java
index fd8a0325e6..5bb228e5c4 100644
--- 
a/jena-fuseki2/jena-fuseki-core/src/main/java/org/apache/jena/fuseki/servlets/GSP_RW.java
+++ 
b/jena-fuseki2/jena-fuseki-core/src/main/java/org/apache/jena/fuseki/servlets/GSP_RW.java
@@ -105,7 +105,7 @@ public class GSP_RW extends GSP_R {
         }
         catch (ActionErrorException ex) { throw ex; }
         catch (Exception ex) { action.abortSilent(); }
-        finally { action.end(); }
+        finally { action.endWrite(); }
         ServletOps.successNoContent(action);
     }
 
@@ -215,7 +215,7 @@ public class GSP_RW extends GSP_R {
             ServletOps.errorOccurred(ex.getMessage());
             return null;
         } finally {
-            action.end();
+            action.endWrite();
         }
     }
 
@@ -261,6 +261,6 @@ public class GSP_RW extends GSP_R {
             action.abortSilent();
             ServletOps.errorOccurred(ex.getMessage());
             return null;
-        } finally { action.end(); }
+        } finally { action.endWrite(); }
     }
 }
diff --git 
a/jena-fuseki2/jena-fuseki-core/src/main/java/org/apache/jena/fuseki/servlets/HttpAction.java
 
b/jena-fuseki2/jena-fuseki-core/src/main/java/org/apache/jena/fuseki/servlets/HttpAction.java
index dc3bfd89d5..5a5d1b7a34 100644
--- 
a/jena-fuseki2/jena-fuseki-core/src/main/java/org/apache/jena/fuseki/servlets/HttpAction.java
+++ 
b/jena-fuseki2/jena-fuseki-core/src/main/java/org/apache/jena/fuseki/servlets/HttpAction.java
@@ -147,7 +147,6 @@ public class HttpAction
      * @param dService {@link DataService}
      * @see Transactional
      */
-
     public void setRequest(DataAccessPoint dataAccessPoint, DataService 
dService) {
         if ( this.dataAccessPoint != null )
             throw new FusekiException("Redefinition of DataAccessPoint in the 
request action");
@@ -219,7 +218,6 @@ public class HttpAction
     /**
      * Return the "Transactional" for this HttpAction.
      */
-
     public Transactional getTransactional() {
         return transactional;
     }
@@ -232,7 +230,8 @@ public class HttpAction
         return actionURI;
     }
 
-    /** Get the context path.
+    /**
+     * Get the context path.
      */
     public String getContextPath() {
         return contextPath;
@@ -252,7 +251,8 @@ public class HttpAction
         return dataAccessPointRegistry;
     }
 
-    /** Set the endpoint and endpoint name that this is an action for.
+    /**
+     * Set the endpoint and endpoint name that this is an action for.
      * @param endpoint {@link Endpoint}
      */
     public void setEndpoint(Endpoint endpoint) {
@@ -273,64 +273,207 @@ public class HttpAction
         return isTransactional;
     }
 
+    public final void startRequest() {
+        if ( dataAccessPoint != null )
+            dataAccessPoint.startRequest(this);
+    }
+
+    public final void finishRequest() {
+        // Should be handled where necessary in the request handling.
+//      if ( inputStream != null ) {
+//          ActionLib.consumeBody(this);
+//      }
+//      if ( outputStream != null ) {
+//          IO.flush(outputStream);
+//          IO.close(outputStream);
+//      }
+        if ( dataAccessPoint != null )
+            dataAccessPoint.finishRequest(this);
+        if ( isInActionTxn )
+            FmtLog.error(log, "[%d] Action transaction not finished", this.id);
+    }
+
+    /**
+     * Begin a transaction of any {@link TxnType} - this should be paired with 
an {@link #end()}
+     *
+     * <pre>
+     *   httpAction.begin();
+     *   try {
+     *      work
+     *      possible promote to write
+     *      commit or abort (if write)
+     *      response
+     *   } finally { httpAction.end(); }
+     * </pre>
+     *
+     * Prefer one of {@link #beginRead()} or {@link #beginWrite()} if the 
action is
+     * known to be a a read or write at the start or call {@link #begin()}
+     * if the transaction may promote.
+     *
+     *
+     */
     public void begin(TxnType txnType) {
+        enterActionTxn();
         if ( transactional != null )
             transactional.begin(txnType);
         activeDSG = dsg;
         if ( dataService != null )
             dataService.startTxn(txnType);
+        startActionTxn();
     }
 
+    private /*public*/ void endInternal() {
+        // Commit or abort may have called end() already.
+        // Possibly multiple endWrite, endRead (not ideal, but not breaking)
+        if ( actionTxEndCalled )
+            return;
+        actionTxEndCalled = true;
+        finishActionTxn();
+        if ( dataService != null )
+            dataService.finishTxn();
+        if ( transactional.isInTransaction() ) {
+            switch(transactional.transactionMode() ) {
+                case READ -> {}
+                case WRITE -> {
+                    // Write transactions must have explicitly called commit 
or abort.
+                    FmtLog.warn(log, "[%d] Transaction still active - no 
commit or abort seen (forced abort)", this.id);
+                    try {
+                        transactional.abort();
+                    } catch (RuntimeException ex) {
+                        FmtLog.warn(log, "[%d] Exception in forced abort 
(trying to continue)", this.id, ex);
+                    }
+                }
+            }
+            try { transactional.end(); }
+            catch (RuntimeException ex) {}
+        }
+        leaveActionTxn();
+    }
+
+    /**
+     * Begin a transaction - this should be paired with an {@link #end()}
+     * <pre>
+     *   httpAction.begin();
+     *   try {
+     *      work
+     *      possible promote to write
+     *      commit or abort (if write)
+     *      response
+     *   } finally { httpAction.end(); }
+     *   </pre>
+     */
     public void begin() {
         begin(READ_PROMOTE);
     }
 
+    /**
+     *  Begin a write operation - this should be paired with an {@link 
#endWrite()}
+     * <pre>
+     *   httpAction.beginWrite();
+     *   try {
+     *      work
+     *      commit or abort
+     *      response
+     *   } finally { httpAction.endWrite(); }
+     *   </pre>
+     */
     public void beginWrite() {
         begin(WRITE);
     }
 
+    /**
+     *  Begin a read operation - this should be paired with an {@link 
#endRead()}
+     * <pre>
+     *   httpAction.beginRead();
+     *   try {
+     *      work
+     *      response
+     *   } finally { httpAction.endRead(); }
+     *   </pre>
+     */
     public void beginRead() {
         begin(READ);
     }
 
+    /**
+     * End a read transaction - paired with {@link #beginRead}.
+     */
     public void endRead() {
-        if ( dataService != null )
-            dataService.finishTxn();
-        if ( transactional != null ) {
-            try { transactional.commit(); } catch (RuntimeException ex) {}
-            try { transactional.end(); } catch (RuntimeException ex) {}
-        }
+        endInternal();
+    }
+
+    /**
+     * End a write transaction - paired with {@link #beginWrite}.
+     */
+    public void endWrite() {
+        endInternal();
     }
 
+    /**
+     * End a write transaction - paired with {@link #begin()} or {@link 
#begin(TxnType)}.
+     */
     public void end() {
-        dataService.finishTxn();
-        if ( transactional.isInTransaction() ) {
-            FmtLog.warn(log, "[%d] Transaction still active - no commit or 
abort seen (forced abort)", this.id);
-            try {
-                transactional.abort();
-            } catch (RuntimeException ex) {
-                FmtLog.warn(log, "[%d] Exception in forced abort (trying to 
continue)", this.id, ex);
-            }
-        }
-        if ( transactional.isInTransaction() ) {
-            try { transactional.end(); }
-            catch (RuntimeException ex) {}
+        endInternal();
+    }
+
+    // An action begin-end is on the same thread. No concurrency issues for 
this flag.
+    private boolean isInActionTxn = false;
+    private int actionTxnCount = 0;
+    /** Used to detect two calls to {@link #endAny} */
+    private boolean actionTxEndCalled = false;
+
+    // begin :: enter then start
+    // end  ::  finish then leave
+
+    /**
+     * Called on entry to {@link #begin(TxnType)}; paired with {@link 
#leaveActionTxn}.
+     * {@code transaction.isInTransaction()} is false.
+     */
+    private void enterActionTxn() {
+        if ( isInActionTxn ) {
+            FmtLog.warn(log, "[%d] Already in an action", this.id);
+            ServletOps.errorOccurred("Internal error: bad action handling");
         }
-        endOfAction();
+        if ( actionTxnCount != 0 )
+            FmtLog.error(log, "[%d] Enter: actionTxnCount = %d", this.id, 
actionTxnCount);
+    }
+
+    /**
+     * Called on exit from {@link #end()}; paired with {@link #enterActionTxn}.
+     * {@code transaction.isInTransaction()} is false.
+     */
+    private void leaveActionTxn() {
+        if ( actionTxnCount != 0 )
+            FmtLog.error(log, "[%d] Leave: actionTxnCount = %d", this.id, 
actionTxnCount);
+        if ( isInActionTxn )
+            FmtLog.error(log, "[%d] Action transaction not end'ed.", this.id);
     }
 
-    private void endOfAction() {
+    /**
+     * Called on leaving {@link #begin(TxnType)}; paired with {@link 
#leaveActionTxn}.
+     * {@code transaction.isInTransaction()} is true.
+     */
+    private void startActionTxn() {
+        isInActionTxn = true;
+        actionTxnCount++;
+        actionTxEndCalled = false;
+    }
+
+    /**
+     * Called on exit from {@link #end()}; paired with {@link #startActionTxn}.
+     * {@code transaction.isInTransaction()} is false.
+     */
+    private void finishActionTxn() {
+        if ( ! isInActionTxn )
+            // Double end?
+            FmtLog.warn(log, "[%d] end called - Not in an action", this.id);
         activeDSG = null;
-        // Should be handled where necessary in the request handling.
-//        if ( inputStream != null ) {
-//            ActionLib.consumeBody(this);
-//        }
-//        if ( outputStream != null ) {
-//            IO.flush(outputStream);
-//            IO.close(outputStream);
-//        }
+        isInActionTxn = false;
+        actionTxnCount--;
     }
 
+    // ----
+
     public void commit() {
         dataService.finishTxn();
         transactional.commit();
@@ -358,16 +501,6 @@ public class HttpAction
         end();
     }
 
-    public final void startRequest() {
-        if ( dataAccessPoint != null )
-            dataAccessPoint.startRequest(this);
-    }
-
-    public final void finishRequest() {
-        if ( dataAccessPoint != null )
-            dataAccessPoint.finishRequest(this);
-    }
-
     /** If inside the transaction for the action, return the active {@link 
DatasetGraph},
      *  otherwise return null.
      * @return Current active {@link DatasetGraph}
@@ -400,7 +533,8 @@ public class HttpAction
 //        this.datasetName = datasetName;
 //    }
 
-    /** Reduce to a size that can be kept around for sometime.
+    /**
+     * Reduce to a size that can be kept around for sometime.
      */
     public void minimize() {
         this.request = null;
@@ -525,7 +659,8 @@ public class HttpAction
         return inputStream;
     }
 
-    /** Get the request input stream, bypassing any compression.
+    /**
+     * Get the request input stream, bypassing any compression.
      * The state of the input stream is unknown.
      * Only useful for skipping a body on a connection.
      * @throws IOException
diff --git 
a/jena-fuseki2/jena-fuseki-core/src/main/java/org/apache/jena/fuseki/patch/PatchApplyService.java
 
b/jena-fuseki2/jena-fuseki-core/src/main/java/org/apache/jena/fuseki/servlets/PatchApply.java
similarity index 97%
rename from 
jena-fuseki2/jena-fuseki-core/src/main/java/org/apache/jena/fuseki/patch/PatchApplyService.java
rename to 
jena-fuseki2/jena-fuseki-core/src/main/java/org/apache/jena/fuseki/servlets/PatchApply.java
index 4a06fc8474..1481a57505 100644
--- 
a/jena-fuseki2/jena-fuseki-core/src/main/java/org/apache/jena/fuseki/patch/PatchApplyService.java
+++ 
b/jena-fuseki2/jena-fuseki-core/src/main/java/org/apache/jena/fuseki/servlets/PatchApply.java
@@ -15,7 +15,7 @@
  *  information regarding copyright ownership.
  */
 
-package org.apache.jena.fuseki.patch;
+package org.apache.jena.fuseki.servlets;
 
 import static java.lang.String.format;
 import static org.apache.jena.fuseki.servlets.ActionExecLib.incCounter;
@@ -25,7 +25,6 @@ import java.io.InputStream;
 
 import org.apache.jena.atlas.web.ContentType;
 import org.apache.jena.fuseki.server.CounterName;
-import org.apache.jena.fuseki.servlets.*;
 import org.apache.jena.rdfpatch.PatchException;
 import org.apache.jena.rdfpatch.RDFChanges;
 import org.apache.jena.rdfpatch.changes.*;
@@ -37,7 +36,7 @@ import org.apache.jena.sparql.core.DatasetGraph;
 import org.apache.jena.web.HttpSC;
 
 /** A Fuseki service to receive and apply a patch. */
-public class PatchApplyService extends ActionREST {
+public class PatchApply extends ActionREST {
     static CounterName counterPatches     = 
CounterName.register("RDFpatch-apply", "rdf-patch.apply.requests");
     static CounterName counterPatchesGood = 
CounterName.register("RDFpatch-apply", "rdf-patch.apply.good");
     static CounterName counterPatchesBad  = 
CounterName.register("RDFpatch-apply", "rdf-patch.apply.bad");
@@ -46,7 +45,7 @@ public class PatchApplyService extends ActionREST {
     private ContentType ctPatchText   = WebContent.ctPatch;
     private ContentType ctPatchBinary = WebContent.ctPatchThrift;
 
-    public PatchApplyService() {
+    public PatchApply() {
         // Counters: the standard ActionREST counters per operation are enough.
     }
 
@@ -111,7 +110,10 @@ public class PatchApplyService extends ActionREST {
         catch (Exception ex) {
             action.abort();
             throw ex;
-        } finally { action.end(); }
+        }
+        finally {
+            action.endWrite();
+        }
         ServletOps.success(action);
     }
 
diff --git 
a/jena-fuseki2/jena-fuseki-core/src/main/java/org/apache/jena/fuseki/servlets/SPARQL_Update.java
 
b/jena-fuseki2/jena-fuseki-core/src/main/java/org/apache/jena/fuseki/servlets/SPARQL_Update.java
index d760495308..8c50cddfd5 100644
--- 
a/jena-fuseki2/jena-fuseki-core/src/main/java/org/apache/jena/fuseki/servlets/SPARQL_Update.java
+++ 
b/jena-fuseki2/jena-fuseki-core/src/main/java/org/apache/jena/fuseki/servlets/SPARQL_Update.java
@@ -258,7 +258,7 @@ public class SPARQL_Update extends ActionService
                 abortSilent(action);
                 ServletOps.errorOccurred(ex.getMessage(), ex);
             }
-        } finally { action.end(); }
+        } finally { action.endWrite(); }
     }
 
     /* [It is an error to supply the using-graph-uri or using-named-graph-uri 
parameters
diff --git 
a/jena-fuseki2/jena-fuseki-core/src/main/java/org/apache/jena/fuseki/servlets/UploadRDF.java
 
b/jena-fuseki2/jena-fuseki-core/src/main/java/org/apache/jena/fuseki/servlets/UploadRDF.java
index 2e336932ed..24de0d7b94 100644
--- 
a/jena-fuseki2/jena-fuseki-core/src/main/java/org/apache/jena/fuseki/servlets/UploadRDF.java
+++ 
b/jena-fuseki2/jena-fuseki-core/src/main/java/org/apache/jena/fuseki/servlets/UploadRDF.java
@@ -138,7 +138,7 @@ public class UploadRDF extends ActionREST {
             action.abortSilent();
             ServletOps.errorOccurred(ex.getMessage());
         } finally {
-            action.end();
+            action.endWrite();
         }
         return details;
     }
@@ -192,7 +192,7 @@ public class UploadRDF extends ActionREST {
             action.abortSilent();
             ServletOps.errorOccurred(ex.getMessage());
         } finally {
-            action.end();
+            action.endWrite();
         }
         return details;
     }
diff --git 
a/jena-fuseki2/jena-fuseki-main/src/test/java/org/apache/jena/fuseki/main/TestFusekiCustomOperation.java
 
b/jena-fuseki2/jena-fuseki-main/src/test/java/org/apache/jena/fuseki/main/TestFusekiCustomOperation.java
index 6d70a91da9..c7dba503ff 100644
--- 
a/jena-fuseki2/jena-fuseki-main/src/test/java/org/apache/jena/fuseki/main/TestFusekiCustomOperation.java
+++ 
b/jena-fuseki2/jena-fuseki-main/src/test/java/org/apache/jena/fuseki/main/TestFusekiCustomOperation.java
@@ -182,7 +182,7 @@ public class TestFusekiCustomOperation {
         // .build();
     }
 
-    // Bad test: no MIME type must match.
+    // Bad test: MIME type must match.
     @Test
     public void cfg_bad_ct_not_enabled_here_1() {
         FusekiServer server = FusekiServer.create().port(port)
diff --git 
a/jena-rdfpatch/src/main/java/org/apache/jena/rdfpatch/system/DatasetGraphChanges.java
 
b/jena-rdfpatch/src/main/java/org/apache/jena/rdfpatch/system/DatasetGraphChanges.java
index 00bdc77690..af2946e377 100644
--- 
a/jena-rdfpatch/src/main/java/org/apache/jena/rdfpatch/system/DatasetGraphChanges.java
+++ 
b/jena-rdfpatch/src/main/java/org/apache/jena/rdfpatch/system/DatasetGraphChanges.java
@@ -263,7 +263,7 @@ public class DatasetGraphChanges extends 
DatasetGraphWrapper {
     public void commit() {
         // Assume local commit will work - so signal first.
         // If the monitor.txnCommit fails, the commit should not happen
-        if ( isWriteMode() ) {
+        if ( isInWriteMode() ) {
             try {
                 changesMonitor.txnCommit();
             } catch (Exception ex) {
@@ -279,12 +279,12 @@ public class DatasetGraphChanges extends 
DatasetGraphWrapper {
     @Override
     public void abort() {
         // Assume abort will work - signal first.
-        if ( isWriteMode() )
+        if ( isInWriteMode() )
             changesMonitor.txnAbort();
         super.abort();
     }
 
-    private boolean isWriteMode() {
+    private boolean isInWriteMode() {
         return super.transactionMode() == ReadWrite.WRITE;
     }
 

Reply via email to