This is an automated email from the ASF dual-hosted git repository.
andy pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/jena.git
The following commit(s) were added to refs/heads/main by this push:
new 807df6a9cf GH-1961: Thread-safe and consistent GraphTxn.find
new 8e6c4bd734 Merge pull request #1964 from afs/graphtxn
807df6a9cf is described below
commit 807df6a9cf75d63318ab4269876f5b58315a5fee
Author: Andy Seaborne <[email protected]>
AuthorDate: Thu Jul 20 13:14:26 2023 +0100
GH-1961: Thread-safe and consistent GraphTxn.find
---
.../org/apache/jena/sparql/graph/GraphTxn.java | 52 +++++++++++++++++++++-
.../src/main/java/org/apache/jena/system/Txn.java | 32 ++++++-------
2 files changed, 67 insertions(+), 17 deletions(-)
diff --git a/jena-arq/src/main/java/org/apache/jena/sparql/graph/GraphTxn.java
b/jena-arq/src/main/java/org/apache/jena/sparql/graph/GraphTxn.java
index 691bae5141..813d0c06b8 100644
--- a/jena-arq/src/main/java/org/apache/jena/sparql/graph/GraphTxn.java
+++ b/jena-arq/src/main/java/org/apache/jena/sparql/graph/GraphTxn.java
@@ -18,13 +18,19 @@
package org.apache.jena.sparql.graph;
+import java.util.List;
+
import org.apache.jena.graph.Graph;
+import org.apache.jena.graph.Node;
+import org.apache.jena.graph.Triple;
import org.apache.jena.query.ReadWrite;
import org.apache.jena.query.TxnType;
import org.apache.jena.sparql.core.DatasetGraph;
import org.apache.jena.sparql.core.DatasetGraphFactory;
import org.apache.jena.sparql.core.Transactional;
import org.apache.jena.sparql.core.mem.DatasetGraphInMemory;
+import org.apache.jena.util.iterator.ExtendedIterator;
+import org.apache.jena.util.iterator.WrappedIterator;
/**
* In-memory, transactional graph.
@@ -32,7 +38,9 @@ import org.apache.jena.sparql.core.mem.DatasetGraphInMemory;
* @implNote
* The implementation uses the default graph of {@link DatasetGraphInMemory}.
* The graph transaction handler continues to work.
- * This class adds the {@link Transactional} to the graph itself.
+ * This class adds the {@link Transactional} to the graph itself
+ * and provides {@link ExtendedIterator ExtendedIterators} that provide
+ * read access to the data if used outside a transaction.
*/
public class GraphTxn extends GraphWrapper implements Transactional {
@@ -98,4 +106,46 @@ public class GraphTxn extends GraphWrapper implements
Transactional {
public boolean isInTransaction() {
return getT().isInTransaction();
}
+
+ private static class IteratorTxn<T> extends WrappedIterator<T> {
+
+ private final GraphTxn graph;
+ private final boolean needIterTxn;
+
+ IteratorTxn(GraphTxn graph, ExtendedIterator<T> base) {
+ super(base, true); // removeDenied.
+ this.graph = graph;
+ needIterTxn = graph.getT().isInTransaction();
+ if ( needIterTxn )
+ graph.begin(TxnType.READ);
+ }
+
+ @Override
+ public void close() {
+ if ( needIterTxn ) {
+ graph.commit();
+ graph.end();
+ }
+ }
+ }
+
+ @Override
+ public ExtendedIterator<Triple> find(Triple triple) {
+ if ( false )
+ return isolate(get().find(triple));
+ return new IteratorTxn<Triple>(this, get().find(triple));
+ }
+
+ @Override
+ public ExtendedIterator<Triple> find(Node s, Node p, Node o) {
+ if ( false )
+ return isolate(get().find(s, p, o));
+ return new IteratorTxn<Triple>(this, get().find(s, p, o));
+ }
+
+ /** Isolate by materializing the iterator. */
+ private ExtendedIterator<Triple> isolate(ExtendedIterator<Triple> iter) {
+ List<Triple> list = iter.toList();
+ return WrappedIterator.create(list.iterator());
+ }
}
diff --git a/jena-arq/src/main/java/org/apache/jena/system/Txn.java
b/jena-arq/src/main/java/org/apache/jena/system/Txn.java
index 61d66c8e2a..603f0fe145 100644
--- a/jena-arq/src/main/java/org/apache/jena/system/Txn.java
+++ b/jena-arq/src/main/java/org/apache/jena/system/Txn.java
@@ -18,10 +18,10 @@
package org.apache.jena.system;
-import java.util.function.Supplier ;
+import java.util.function.Supplier;
import org.apache.jena.query.TxnType;
-import org.apache.jena.sparql.core.Transactional ;
+import org.apache.jena.sparql.core.Transactional;
/** Application utilities for executing code in transactions.
* <p>
@@ -69,43 +69,43 @@ public class Txn {
/** Execute application code in a transaction with the given {@link
TxnType transaction type}. */
public static <T extends Transactional> void exec(T txn, TxnType txnType,
Runnable r) {
- boolean b = txn.isInTransaction() ;
+ boolean b = txn.isInTransaction();
if ( b )
TxnOp.compatibleWithPromote(txnType, txn);
else
- txn.begin(txnType) ;
- try { r.run() ; }
+ txn.begin(txnType);
+ try { r.run(); }
catch (Throwable th) {
onThrowable(th, txn);
- throw th ;
+ throw th;
}
if ( !b ) {
if ( txn.isInTransaction() )
// May have been explicit commit or abort.
- txn.commit() ;
- txn.end() ;
+ txn.commit();
+ txn.end();
}
}
/** Execute and return a value in a transaction with the given {@link
TxnType transaction type}. */
public static <T extends Transactional, X> X calc(T txn, TxnType txnType,
Supplier<X> r) {
- boolean b = txn.isInTransaction() ;
+ boolean b = txn.isInTransaction();
if ( b )
TxnOp.compatibleWithPromote(txnType, txn);
else
- txn.begin(txnType) ;
+ txn.begin(txnType);
X x;
- try { x = r.get() ; }
+ try { x = r.get(); }
catch (Throwable th) {
onThrowable(th, txn);
- throw th ;
+ throw th;
}
if ( !b ) {
if ( txn.isInTransaction() )
// May have been explicit commit or abort.
- txn.commit() ;
- txn.end() ;
+ txn.commit();
+ txn.end();
}
return x;
}
@@ -133,8 +133,8 @@ public class Txn {
// Attempt some kind of cleanup.
private static <T extends Transactional> void onThrowable(Throwable th, T
txn) {
try {
- txn.abort() ;
- txn.end() ;
+ txn.abort();
+ txn.end();
} catch (Throwable th2) { th.addSuppressed(th2); }
}
}