This is an automated email from the ASF dual-hosted git repository. andy pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/jena.git
commit 711fe075a01adc1ab152c86cb36a68318fb8bca7 Author: Andy Seaborne <[email protected]> AuthorDate: Thu May 30 14:44:44 2024 +0100 GH-2506: Intercept and check HttpAction transaction lifecycle --- .../org/apache/jena/sparql/core/Transactional.java | 3 +- .../org/apache/jena/fuseki/server/DataService.java | 8 +- .../jena/fuseki/server/OperationRegistry.java | 3 +- .../org/apache/jena/fuseki/servlets/GSP_RW.java | 6 +- .../apache/jena/fuseki/servlets/HttpAction.java | 223 +++++++++++++++++---- .../PatchApply.java} | 12 +- .../apache/jena/fuseki/servlets/SPARQL_Update.java | 2 +- .../org/apache/jena/fuseki/servlets/UploadRDF.java | 4 +- .../fuseki/main/TestFusekiCustomOperation.java | 2 +- .../jena/rdfpatch/system/DatasetGraphChanges.java | 6 +- 10 files changed, 203 insertions(+), 66 deletions(-) diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/core/Transactional.java b/jena-arq/src/main/java/org/apache/jena/sparql/core/Transactional.java index 19e9c94b6d..c6c1370522 100644 --- a/jena-arq/src/main/java/org/apache/jena/sparql/core/Transactional.java +++ b/jena-arq/src/main/java/org/apache/jena/sparql/core/Transactional.java @@ -190,7 +190,8 @@ public interface Transactional /** Finish the transaction - if a write transaction and commit() has not been called, then abort */ public void end() ; - /** Return the current mode of the transaction - "read" or "write". + /** + * Return the current mode of the transaction - "read" or "write". * If the caller is not in a transaction, this method returns null. */ public ReadWrite transactionMode(); diff --git a/jena-fuseki2/jena-fuseki-core/src/main/java/org/apache/jena/fuseki/server/DataService.java b/jena-fuseki2/jena-fuseki-core/src/main/java/org/apache/jena/fuseki/server/DataService.java index 3ecb06aba1..7d823180b6 100644 --- a/jena-fuseki2/jena-fuseki-core/src/main/java/org/apache/jena/fuseki/server/DataService.java +++ b/jena-fuseki2/jena-fuseki-core/src/main/java/org/apache/jena/fuseki/server/DataService.java @@ -239,6 +239,10 @@ public class DataService { totalTxn.getAndIncrement(); } + public void finishTxn() { + activeTxn.decrementAndGet(); + } + private void check(DataServiceStatus status) { if ( state != status ) { String msg = format("DataService %s: Expected=%s, Actual=%s", label(), status, state); @@ -246,10 +250,6 @@ public class DataService { } } - public void finishTxn() { - activeTxn.decrementAndGet(); - } - /** Shutdown and never use again. */ public synchronized void shutdown() { if ( state == CLOSING ) diff --git a/jena-fuseki2/jena-fuseki-core/src/main/java/org/apache/jena/fuseki/server/OperationRegistry.java b/jena-fuseki2/jena-fuseki-core/src/main/java/org/apache/jena/fuseki/server/OperationRegistry.java index bbfb61377a..7038c82f78 100644 --- a/jena-fuseki2/jena-fuseki-core/src/main/java/org/apache/jena/fuseki/server/OperationRegistry.java +++ b/jena-fuseki2/jena-fuseki-core/src/main/java/org/apache/jena/fuseki/server/OperationRegistry.java @@ -26,7 +26,6 @@ import jakarta.servlet.ServletContext; import org.apache.jena.atlas.logging.Log; import org.apache.jena.fuseki.Fuseki; -import org.apache.jena.fuseki.patch.PatchApplyService; import org.apache.jena.fuseki.servlets.*; import org.apache.jena.riot.WebContent; @@ -47,7 +46,7 @@ public class OperationRegistry { private static final ActionService uploadServlet = new UploadRDF(); private static final ActionService gspServlet_R = new GSP_R(); private static final ActionService gspServlet_RW = new GSP_RW(); - private static final ActionService rdfPatch = new PatchApplyService(); + private static final ActionService rdfPatch = new PatchApply(); private static final ActionService noOperation = new NoOpActionService(); private static final ActionService shaclValidation = new SHACL_Validation(); diff --git a/jena-fuseki2/jena-fuseki-core/src/main/java/org/apache/jena/fuseki/servlets/GSP_RW.java b/jena-fuseki2/jena-fuseki-core/src/main/java/org/apache/jena/fuseki/servlets/GSP_RW.java index fd8a0325e6..5bb228e5c4 100644 --- a/jena-fuseki2/jena-fuseki-core/src/main/java/org/apache/jena/fuseki/servlets/GSP_RW.java +++ b/jena-fuseki2/jena-fuseki-core/src/main/java/org/apache/jena/fuseki/servlets/GSP_RW.java @@ -105,7 +105,7 @@ public class GSP_RW extends GSP_R { } catch (ActionErrorException ex) { throw ex; } catch (Exception ex) { action.abortSilent(); } - finally { action.end(); } + finally { action.endWrite(); } ServletOps.successNoContent(action); } @@ -215,7 +215,7 @@ public class GSP_RW extends GSP_R { ServletOps.errorOccurred(ex.getMessage()); return null; } finally { - action.end(); + action.endWrite(); } } @@ -261,6 +261,6 @@ public class GSP_RW extends GSP_R { action.abortSilent(); ServletOps.errorOccurred(ex.getMessage()); return null; - } finally { action.end(); } + } finally { action.endWrite(); } } } diff --git a/jena-fuseki2/jena-fuseki-core/src/main/java/org/apache/jena/fuseki/servlets/HttpAction.java b/jena-fuseki2/jena-fuseki-core/src/main/java/org/apache/jena/fuseki/servlets/HttpAction.java index dc3bfd89d5..5a5d1b7a34 100644 --- a/jena-fuseki2/jena-fuseki-core/src/main/java/org/apache/jena/fuseki/servlets/HttpAction.java +++ b/jena-fuseki2/jena-fuseki-core/src/main/java/org/apache/jena/fuseki/servlets/HttpAction.java @@ -147,7 +147,6 @@ public class HttpAction * @param dService {@link DataService} * @see Transactional */ - public void setRequest(DataAccessPoint dataAccessPoint, DataService dService) { if ( this.dataAccessPoint != null ) throw new FusekiException("Redefinition of DataAccessPoint in the request action"); @@ -219,7 +218,6 @@ public class HttpAction /** * Return the "Transactional" for this HttpAction. */ - public Transactional getTransactional() { return transactional; } @@ -232,7 +230,8 @@ public class HttpAction return actionURI; } - /** Get the context path. + /** + * Get the context path. */ public String getContextPath() { return contextPath; @@ -252,7 +251,8 @@ public class HttpAction return dataAccessPointRegistry; } - /** Set the endpoint and endpoint name that this is an action for. + /** + * Set the endpoint and endpoint name that this is an action for. * @param endpoint {@link Endpoint} */ public void setEndpoint(Endpoint endpoint) { @@ -273,64 +273,207 @@ public class HttpAction return isTransactional; } + public final void startRequest() { + if ( dataAccessPoint != null ) + dataAccessPoint.startRequest(this); + } + + public final void finishRequest() { + // Should be handled where necessary in the request handling. +// if ( inputStream != null ) { +// ActionLib.consumeBody(this); +// } +// if ( outputStream != null ) { +// IO.flush(outputStream); +// IO.close(outputStream); +// } + if ( dataAccessPoint != null ) + dataAccessPoint.finishRequest(this); + if ( isInActionTxn ) + FmtLog.error(log, "[%d] Action transaction not finished", this.id); + } + + /** + * Begin a transaction of any {@link TxnType} - this should be paired with an {@link #end()} + * + * <pre> + * httpAction.begin(); + * try { + * work + * possible promote to write + * commit or abort (if write) + * response + * } finally { httpAction.end(); } + * </pre> + * + * Prefer one of {@link #beginRead()} or {@link #beginWrite()} if the action is + * known to be a a read or write at the start or call {@link #begin()} + * if the transaction may promote. + * + * + */ public void begin(TxnType txnType) { + enterActionTxn(); if ( transactional != null ) transactional.begin(txnType); activeDSG = dsg; if ( dataService != null ) dataService.startTxn(txnType); + startActionTxn(); } + private /*public*/ void endInternal() { + // Commit or abort may have called end() already. + // Possibly multiple endWrite, endRead (not ideal, but not breaking) + if ( actionTxEndCalled ) + return; + actionTxEndCalled = true; + finishActionTxn(); + if ( dataService != null ) + dataService.finishTxn(); + if ( transactional.isInTransaction() ) { + switch(transactional.transactionMode() ) { + case READ -> {} + case WRITE -> { + // Write transactions must have explicitly called commit or abort. + FmtLog.warn(log, "[%d] Transaction still active - no commit or abort seen (forced abort)", this.id); + try { + transactional.abort(); + } catch (RuntimeException ex) { + FmtLog.warn(log, "[%d] Exception in forced abort (trying to continue)", this.id, ex); + } + } + } + try { transactional.end(); } + catch (RuntimeException ex) {} + } + leaveActionTxn(); + } + + /** + * Begin a transaction - this should be paired with an {@link #end()} + * <pre> + * httpAction.begin(); + * try { + * work + * possible promote to write + * commit or abort (if write) + * response + * } finally { httpAction.end(); } + * </pre> + */ public void begin() { begin(READ_PROMOTE); } + /** + * Begin a write operation - this should be paired with an {@link #endWrite()} + * <pre> + * httpAction.beginWrite(); + * try { + * work + * commit or abort + * response + * } finally { httpAction.endWrite(); } + * </pre> + */ public void beginWrite() { begin(WRITE); } + /** + * Begin a read operation - this should be paired with an {@link #endRead()} + * <pre> + * httpAction.beginRead(); + * try { + * work + * response + * } finally { httpAction.endRead(); } + * </pre> + */ public void beginRead() { begin(READ); } + /** + * End a read transaction - paired with {@link #beginRead}. + */ public void endRead() { - if ( dataService != null ) - dataService.finishTxn(); - if ( transactional != null ) { - try { transactional.commit(); } catch (RuntimeException ex) {} - try { transactional.end(); } catch (RuntimeException ex) {} - } + endInternal(); + } + + /** + * End a write transaction - paired with {@link #beginWrite}. + */ + public void endWrite() { + endInternal(); } + /** + * End a write transaction - paired with {@link #begin()} or {@link #begin(TxnType)}. + */ public void end() { - dataService.finishTxn(); - if ( transactional.isInTransaction() ) { - FmtLog.warn(log, "[%d] Transaction still active - no commit or abort seen (forced abort)", this.id); - try { - transactional.abort(); - } catch (RuntimeException ex) { - FmtLog.warn(log, "[%d] Exception in forced abort (trying to continue)", this.id, ex); - } - } - if ( transactional.isInTransaction() ) { - try { transactional.end(); } - catch (RuntimeException ex) {} + endInternal(); + } + + // An action begin-end is on the same thread. No concurrency issues for this flag. + private boolean isInActionTxn = false; + private int actionTxnCount = 0; + /** Used to detect two calls to {@link #endAny} */ + private boolean actionTxEndCalled = false; + + // begin :: enter then start + // end :: finish then leave + + /** + * Called on entry to {@link #begin(TxnType)}; paired with {@link #leaveActionTxn}. + * {@code transaction.isInTransaction()} is false. + */ + private void enterActionTxn() { + if ( isInActionTxn ) { + FmtLog.warn(log, "[%d] Already in an action", this.id); + ServletOps.errorOccurred("Internal error: bad action handling"); } - endOfAction(); + if ( actionTxnCount != 0 ) + FmtLog.error(log, "[%d] Enter: actionTxnCount = %d", this.id, actionTxnCount); + } + + /** + * Called on exit from {@link #end()}; paired with {@link #enterActionTxn}. + * {@code transaction.isInTransaction()} is false. + */ + private void leaveActionTxn() { + if ( actionTxnCount != 0 ) + FmtLog.error(log, "[%d] Leave: actionTxnCount = %d", this.id, actionTxnCount); + if ( isInActionTxn ) + FmtLog.error(log, "[%d] Action transaction not end'ed.", this.id); } - private void endOfAction() { + /** + * Called on leaving {@link #begin(TxnType)}; paired with {@link #leaveActionTxn}. + * {@code transaction.isInTransaction()} is true. + */ + private void startActionTxn() { + isInActionTxn = true; + actionTxnCount++; + actionTxEndCalled = false; + } + + /** + * Called on exit from {@link #end()}; paired with {@link #startActionTxn}. + * {@code transaction.isInTransaction()} is false. + */ + private void finishActionTxn() { + if ( ! isInActionTxn ) + // Double end? + FmtLog.warn(log, "[%d] end called - Not in an action", this.id); activeDSG = null; - // Should be handled where necessary in the request handling. -// if ( inputStream != null ) { -// ActionLib.consumeBody(this); -// } -// if ( outputStream != null ) { -// IO.flush(outputStream); -// IO.close(outputStream); -// } + isInActionTxn = false; + actionTxnCount--; } + // ---- + public void commit() { dataService.finishTxn(); transactional.commit(); @@ -358,16 +501,6 @@ public class HttpAction end(); } - public final void startRequest() { - if ( dataAccessPoint != null ) - dataAccessPoint.startRequest(this); - } - - public final void finishRequest() { - if ( dataAccessPoint != null ) - dataAccessPoint.finishRequest(this); - } - /** If inside the transaction for the action, return the active {@link DatasetGraph}, * otherwise return null. * @return Current active {@link DatasetGraph} @@ -400,7 +533,8 @@ public class HttpAction // this.datasetName = datasetName; // } - /** Reduce to a size that can be kept around for sometime. + /** + * Reduce to a size that can be kept around for sometime. */ public void minimize() { this.request = null; @@ -525,7 +659,8 @@ public class HttpAction return inputStream; } - /** Get the request input stream, bypassing any compression. + /** + * Get the request input stream, bypassing any compression. * The state of the input stream is unknown. * Only useful for skipping a body on a connection. * @throws IOException diff --git a/jena-fuseki2/jena-fuseki-core/src/main/java/org/apache/jena/fuseki/patch/PatchApplyService.java b/jena-fuseki2/jena-fuseki-core/src/main/java/org/apache/jena/fuseki/servlets/PatchApply.java similarity index 97% rename from jena-fuseki2/jena-fuseki-core/src/main/java/org/apache/jena/fuseki/patch/PatchApplyService.java rename to jena-fuseki2/jena-fuseki-core/src/main/java/org/apache/jena/fuseki/servlets/PatchApply.java index 4a06fc8474..1481a57505 100644 --- a/jena-fuseki2/jena-fuseki-core/src/main/java/org/apache/jena/fuseki/patch/PatchApplyService.java +++ b/jena-fuseki2/jena-fuseki-core/src/main/java/org/apache/jena/fuseki/servlets/PatchApply.java @@ -15,7 +15,7 @@ * information regarding copyright ownership. */ -package org.apache.jena.fuseki.patch; +package org.apache.jena.fuseki.servlets; import static java.lang.String.format; import static org.apache.jena.fuseki.servlets.ActionExecLib.incCounter; @@ -25,7 +25,6 @@ import java.io.InputStream; import org.apache.jena.atlas.web.ContentType; import org.apache.jena.fuseki.server.CounterName; -import org.apache.jena.fuseki.servlets.*; import org.apache.jena.rdfpatch.PatchException; import org.apache.jena.rdfpatch.RDFChanges; import org.apache.jena.rdfpatch.changes.*; @@ -37,7 +36,7 @@ import org.apache.jena.sparql.core.DatasetGraph; import org.apache.jena.web.HttpSC; /** A Fuseki service to receive and apply a patch. */ -public class PatchApplyService extends ActionREST { +public class PatchApply extends ActionREST { static CounterName counterPatches = CounterName.register("RDFpatch-apply", "rdf-patch.apply.requests"); static CounterName counterPatchesGood = CounterName.register("RDFpatch-apply", "rdf-patch.apply.good"); static CounterName counterPatchesBad = CounterName.register("RDFpatch-apply", "rdf-patch.apply.bad"); @@ -46,7 +45,7 @@ public class PatchApplyService extends ActionREST { private ContentType ctPatchText = WebContent.ctPatch; private ContentType ctPatchBinary = WebContent.ctPatchThrift; - public PatchApplyService() { + public PatchApply() { // Counters: the standard ActionREST counters per operation are enough. } @@ -111,7 +110,10 @@ public class PatchApplyService extends ActionREST { catch (Exception ex) { action.abort(); throw ex; - } finally { action.end(); } + } + finally { + action.endWrite(); + } ServletOps.success(action); } diff --git a/jena-fuseki2/jena-fuseki-core/src/main/java/org/apache/jena/fuseki/servlets/SPARQL_Update.java b/jena-fuseki2/jena-fuseki-core/src/main/java/org/apache/jena/fuseki/servlets/SPARQL_Update.java index d760495308..8c50cddfd5 100644 --- a/jena-fuseki2/jena-fuseki-core/src/main/java/org/apache/jena/fuseki/servlets/SPARQL_Update.java +++ b/jena-fuseki2/jena-fuseki-core/src/main/java/org/apache/jena/fuseki/servlets/SPARQL_Update.java @@ -258,7 +258,7 @@ public class SPARQL_Update extends ActionService abortSilent(action); ServletOps.errorOccurred(ex.getMessage(), ex); } - } finally { action.end(); } + } finally { action.endWrite(); } } /* [It is an error to supply the using-graph-uri or using-named-graph-uri parameters diff --git a/jena-fuseki2/jena-fuseki-core/src/main/java/org/apache/jena/fuseki/servlets/UploadRDF.java b/jena-fuseki2/jena-fuseki-core/src/main/java/org/apache/jena/fuseki/servlets/UploadRDF.java index 2e336932ed..24de0d7b94 100644 --- a/jena-fuseki2/jena-fuseki-core/src/main/java/org/apache/jena/fuseki/servlets/UploadRDF.java +++ b/jena-fuseki2/jena-fuseki-core/src/main/java/org/apache/jena/fuseki/servlets/UploadRDF.java @@ -138,7 +138,7 @@ public class UploadRDF extends ActionREST { action.abortSilent(); ServletOps.errorOccurred(ex.getMessage()); } finally { - action.end(); + action.endWrite(); } return details; } @@ -192,7 +192,7 @@ public class UploadRDF extends ActionREST { action.abortSilent(); ServletOps.errorOccurred(ex.getMessage()); } finally { - action.end(); + action.endWrite(); } return details; } diff --git a/jena-fuseki2/jena-fuseki-main/src/test/java/org/apache/jena/fuseki/main/TestFusekiCustomOperation.java b/jena-fuseki2/jena-fuseki-main/src/test/java/org/apache/jena/fuseki/main/TestFusekiCustomOperation.java index 6d70a91da9..c7dba503ff 100644 --- a/jena-fuseki2/jena-fuseki-main/src/test/java/org/apache/jena/fuseki/main/TestFusekiCustomOperation.java +++ b/jena-fuseki2/jena-fuseki-main/src/test/java/org/apache/jena/fuseki/main/TestFusekiCustomOperation.java @@ -182,7 +182,7 @@ public class TestFusekiCustomOperation { // .build(); } - // Bad test: no MIME type must match. + // Bad test: MIME type must match. @Test public void cfg_bad_ct_not_enabled_here_1() { FusekiServer server = FusekiServer.create().port(port) diff --git a/jena-rdfpatch/src/main/java/org/apache/jena/rdfpatch/system/DatasetGraphChanges.java b/jena-rdfpatch/src/main/java/org/apache/jena/rdfpatch/system/DatasetGraphChanges.java index 00bdc77690..af2946e377 100644 --- a/jena-rdfpatch/src/main/java/org/apache/jena/rdfpatch/system/DatasetGraphChanges.java +++ b/jena-rdfpatch/src/main/java/org/apache/jena/rdfpatch/system/DatasetGraphChanges.java @@ -263,7 +263,7 @@ public class DatasetGraphChanges extends DatasetGraphWrapper { public void commit() { // Assume local commit will work - so signal first. // If the monitor.txnCommit fails, the commit should not happen - if ( isWriteMode() ) { + if ( isInWriteMode() ) { try { changesMonitor.txnCommit(); } catch (Exception ex) { @@ -279,12 +279,12 @@ public class DatasetGraphChanges extends DatasetGraphWrapper { @Override public void abort() { // Assume abort will work - signal first. - if ( isWriteMode() ) + if ( isInWriteMode() ) changesMonitor.txnAbort(); super.abort(); } - private boolean isWriteMode() { + private boolean isInWriteMode() { return super.transactionMode() == ReadWrite.WRITE; }
