Modified: river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TakeIfExistsWatcher.java URL: http://svn.apache.org/viewvc/river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TakeIfExistsWatcher.java?rev=1879521&r1=1879520&r2=1879521&view=diff ============================================================================== --- river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TakeIfExistsWatcher.java (original) +++ river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TakeIfExistsWatcher.java Sun Jul 5 11:41:39 2020 @@ -1,307 +1,310 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.river.outrigger; - -import java.util.Map; -import java.util.Set; -import java.util.WeakHashMap; -import net.jini.core.transaction.TransactionException; -import net.jini.id.Uuid; -import net.jini.space.InternalSpaceException; - -/** - * Subclass of <code>QueryWatcher</code> for <code>takeIfExists</code> - * queries. Resolves with the first matching transition where the - * entry is visible to the associated transaction and the entry is - * still available, or of the locked entry set goes empty. - */ -class TakeIfExistsWatcher extends SingletonQueryWatcher - implements IfExistsWatcher, Transactable -{ - /** - * The set of entries that would match but are currently - * unavailable (e.g. they are locked). We only keep - * the ids, not the entries themselves. - */ - private final Set<Uuid> lockedEntries; - - /** - * Set <code>true</code> once the query thread is - * done processing the backlog. Once this is - * <code>true</code> it is ok to resolve if - * <code>lockedEntries</code> is empty. - */ - private boolean backlogFinished = false; - - /** - * If non-null the transaction this query is - * being performed under. If <code>null</code> - * this query is not associated with a transaction. - */ - private final Txn txn; - - /** - * Set of entries (represented by <code>EntryHolder</code>s) that - * we would have liked to return, but have been provisionally - * removed. - */ - private final Set<EntryHandle> provisionallyRemovedEntrySet; - - /** - * Create a new <code>TakeIfExistsWatcher</code>. - * @param expiration the initial expiration time - * for this <code>TransitionWatcher</code> in - * milliseconds since the beginning of the epoch. - * @param timestamp the value that is used - * to sort <code>TransitionWatcher</code>s. - * @param startOrdinal the highest ordinal associated - * with operations that are considered to have occurred - * before the operation associated with this watcher. - * @param lockedEntries Set of entries (by their IDs) - * that match but are unavailable. Must be non-empty. - * Keeps a reference to this object. - * @param provisionallyRemovedEntrySet If the watcher encounters - * an entry that can not be read/taken because it has been - * provisionally removed then its handle will be placed in - * this <code>WeakHashMap</code> as a key (with null as the - * value). May be <code>null</code> in which case - * provisionally removed entries will not be - * recorded. Ensures that object is only accessed by one - * thread at a time - * @param txn If the query is being performed under - * a transaction the <code>Txn</code> object - * associated with that transaction. - * @throws NullPointerException if <code>lockedEntries</code> is - * <code>null</code>. - */ - TakeIfExistsWatcher(long expiration, long timestamp, - long startOrdinal, Set<Uuid> lockedEntries, - Set<EntryHandle> provisionallyRemovedEntrySet, Txn txn) - { - super(expiration, timestamp, startOrdinal); - - if (lockedEntries == null) - throw new NullPointerException("lockedEntries must be non-null"); - - this.lockedEntries = lockedEntries; - this.txn = txn; - this.provisionallyRemovedEntrySet = provisionallyRemovedEntrySet; - } - - boolean isInterested(EntryTransition transition, long ordinal) { - /* If we are unresolved pretty much all transitions are - * interesting because we may need to update - * lockedEntries. - * - * Note, !isResolved() without the lock will result only in - * false positives, not false negatives - it will only - * cause isInterested() to return false if we are resolved, - * we may still return true if we are resolved though. - */ - return (ordinal>startOrdinal) && !isResolved(); - } - - synchronized void process(EntryTransition transition, long now) { - if (isResolved()) - return; // Already done. - - final EntryHandle handle = transition.getHandle(); - final EntryRep rep = handle.rep(); - final boolean isAvailable = transition.isAvailable(); - final TransactableMgr transitionTxn = transition.getTxn(); - - /* If it at one time it was available to our transaction - * it may still be, try to get it. - */ - if (isAvailable && - ((null == transitionTxn) || txn == transitionTxn)) { - /* Is it still available? */ - if (getServer().attemptCapture(handle, txn, true, null, - provisionallyRemovedEntrySet, now, this)) - { - // Got it - resolve(handle, null); - } else { - /* Must not have been able to get it. Either - * locked under a conflicting lock, in which - * case it should be in our lockedEntries set, or - * it has been removed, in which it still needs - * to be in our lockedEntries since it may have - * been replaced before being removed. - */ - lockedEntries.add(rep.id()); - } - } else if (isAvailable) { // but it is not visible to txn - /* If we are here then at one time it must have been was - * available but not visible to us, implying that it was an - * entry written under a transaction and is interesting to - * us, but not yet visible. We need to add it lockedEntries - * even if has been removed since it could have gotten - * replaced before it was removed. If we did not - * add it we would be acting on the future. - */ - lockedEntries.add(rep.id()); - } else { - /* Must not be available, transition must mark - * the resolution of the transaction in such away - * that the entry has been removed, remove it - * from the set and see if that makes the set empty. - */ - lockedEntries.remove(rep.id()); - if (backlogFinished && lockedEntries.isEmpty()) - resolve(null, null); - } - } - - synchronized boolean catchUp(EntryTransition transition, long now) { - if (isResolved()) - return true; // Already done. - - final EntryHandle handle = transition.getHandle(); - final EntryRep rep = handle.rep(); - final boolean isAvailable = transition.isAvailable(); - final TransactableMgr transitionTxn = transition.getTxn(); - - /* If it at one time it was available to our transaction - * it may still be, try to get it. - */ - if (isAvailable && - ((null == transitionTxn) || txn == transitionTxn)) { - /* Is it still available? Try to get it. attemptCapture will - * add the entry to lockedEntries for us if we could not - * get it and it is still in the space (but locked). - * Nothing will be added if it has been removed outright. - * This is ok even though we are peaking into the future - - * we won't act on that information until the future - * comes to pass. - */ - if (getServer().attemptCapture(handle, txn, true, - lockedEntries, provisionallyRemovedEntrySet, now, this)) - { - // Got it - resolve(handle, null); - return true; - } - - // did not resolve - return false; - } - - if (isAvailable) { // but it is not visible to txn - /* If we are here then at one time it must have been was - * available but not visible to us, implying that it was - * an entry written under a transaction and is interesting - * to us, but not yet visible. We only add if it has not - * already been removed. It might have gotten replaced - * before removal, but since we won't let this query get - * resolved with a definitive null before we get past the - * point in the journal where it was removed it is ok to - * never put it in (and if we did put it in it might never - * get removed since process() may have already processed - * the removal). We don't need to check to see it the - * entry has been provisionally removed since if has been - * provisional removal does not put in entries in the - * journal and if it is provisionally removed it has not - * yet been removed so the remove recored has not yet been - * created (must less processed). - */ - synchronized (handle) { - if (!handle.removed()) { - lockedEntries.add(rep.id()); - } - - /* If it has been removed, there is no way it - * will be interesting to us again, ever. - */ - } - // Either way, still not resolved. - return false; - } - - /* Must not be available, transition must mark - * the resolution of the transaction in such away - * that the entry has been removed, remove it - * from the set (don't need to check for empty - * because we haven't gotten to the point - * where we can resolve with a definitive null.) - */ - lockedEntries.remove(rep.id()); - return false; - } - - /** - * Once the backlog is complete we can resolve if - * lockedEntries is/becomes empty. - */ - public synchronized void caughtUp() { - backlogFinished = true; - - if (isResolved()) - return; // Don't much mater. - - if (lockedEntries.isEmpty()) - resolve(null, null); - } - - public synchronized boolean isLockedEntrySetEmpty() { - if (!isResolved()) - throw new IllegalStateException("Query not yet resolved"); - return lockedEntries.isEmpty(); - } - - /** - * If a transaction ends in the middle of a query we want - * to throw an exception to the client making the query - * not the <code>Txn</code> calling us here.) - */ - public synchronized int prepare(TransactableMgr mgr, - OutriggerServerImpl space) - { - assert txn != null:"Transactable method called on a " + - "non-transactional TakeIfExistsWatcher"; - - // only throw an exception if we are not resolved. - if (!isResolved()) { - // Query still in progress, kill it - resolve(null, new TransactionException("completed while " + - "operation in progress")); - } - - // If this object has made changes they have been recorded elsewhere - return NOTCHANGED; - } - - /** - * This should never happen since we always return - * <code>NOTCHANGED</code> from <code>prepare</code>. - */ - public void commit(TransactableMgr mgr, OutriggerServerImpl space) { - throw new InternalSpaceException("committing a blocking query"); - - } - - /** - * If a transaction ends in the middle of a query we want - * to throw an exception to the client making the query - * (not the <code>Txn</code> calling us here.) - */ - public void abort(TransactableMgr mgr, OutriggerServerImpl space) { - // prepare does the right thing, and should forever - prepare(mgr, space); - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.river.outrigger; + +import java.util.Map; +import java.util.Set; +import java.util.WeakHashMap; +import net.jini.core.transaction.TransactionException; +import net.jini.id.Uuid; +import net.jini.space.InternalSpaceException; +import org.apache.river.outrigger.proxy.EntryRep; + + + +/** + * Subclass of <code>QueryWatcher</code> for <code>takeIfExists</code> + * queries. Resolves with the first matching transition where the + * entry is visible to the associated transaction and the entry is + * still available, or of the locked entry set goes empty. + */ +class TakeIfExistsWatcher extends SingletonQueryWatcher + implements IfExistsWatcher, Transactable +{ + /** + * The set of entries that would match but are currently + * unavailable (e.g. they are locked). We only keep + * the ids, not the entries themselves. + */ + private final Set<Uuid> lockedEntries; + + /** + * Set <code>true</code> once the query thread is + * done processing the backlog. Once this is + * <code>true</code> it is ok to resolve if + * <code>lockedEntries</code> is empty. + */ + private boolean backlogFinished = false; + + /** + * If non-null the transaction this query is + * being performed under. If <code>null</code> + * this query is not associated with a transaction. + */ + private final Txn txn; + + /** + * Set of entries (represented by <code>EntryHolder</code>s) that + * we would have liked to return, but have been provisionally + * removed. + */ + private final Set<EntryHandle> provisionallyRemovedEntrySet; + + /** + * Create a new <code>TakeIfExistsWatcher</code>. + * @param expiration the initial expiration time + * for this <code>TransitionWatcher</code> in + * milliseconds since the beginning of the epoch. + * @param timestamp the value that is used + * to sort <code>TransitionWatcher</code>s. + * @param startOrdinal the highest ordinal associated + * with operations that are considered to have occurred + * before the operation associated with this watcher. + * @param lockedEntries Set of entries (by their IDs) + * that match but are unavailable. Must be non-empty. + * Keeps a reference to this object. + * @param provisionallyRemovedEntrySet If the watcher encounters + * an entry that can not be read/taken because it has been + * provisionally removed then its handle will be placed in + * this <code>WeakHashMap</code> as a key (with null as the + * value). May be <code>null</code> in which case + * provisionally removed entries will not be + * recorded. Ensures that object is only accessed by one + * thread at a time + * @param txn If the query is being performed under + * a transaction the <code>Txn</code> object + * associated with that transaction. + * @throws NullPointerException if <code>lockedEntries</code> is + * <code>null</code>. + */ + TakeIfExistsWatcher(long expiration, long timestamp, + long startOrdinal, Set<Uuid> lockedEntries, + Set<EntryHandle> provisionallyRemovedEntrySet, Txn txn) + { + super(expiration, timestamp, startOrdinal); + + if (lockedEntries == null) + throw new NullPointerException("lockedEntries must be non-null"); + + this.lockedEntries = lockedEntries; + this.txn = txn; + this.provisionallyRemovedEntrySet = provisionallyRemovedEntrySet; + } + + boolean isInterested(EntryTransition transition, long ordinal) { + /* If we are unresolved pretty much all transitions are + * interesting because we may need to update + * lockedEntries. + * + * Note, !isResolved() without the lock will result only in + * false positives, not false negatives - it will only + * cause isInterested() to return false if we are resolved, + * we may still return true if we are resolved though. + */ + return (ordinal>startOrdinal) && !isResolved(); + } + + synchronized void process(EntryTransition transition, long now) { + if (isResolved()) + return; // Already done. + + final EntryHandle handle = transition.getHandle(); + final EntryRep rep = handle.rep(); + final boolean isAvailable = transition.isAvailable(); + final TransactableMgr transitionTxn = transition.getTxn(); + + /* If it at one time it was available to our transaction + * it may still be, try to get it. + */ + if (isAvailable && + ((null == transitionTxn) || txn == transitionTxn)) { + /* Is it still available? */ + if (getServer().attemptCapture(handle, txn, true, null, + provisionallyRemovedEntrySet, now, this)) + { + // Got it + resolve(handle, null); + } else { + /* Must not have been able to get it. Either + * locked under a conflicting lock, in which + * case it should be in our lockedEntries set, or + * it has been removed, in which it still needs + * to be in our lockedEntries since it may have + * been replaced before being removed. + */ + lockedEntries.add(rep.id()); + } + } else if (isAvailable) { // but it is not visible to txn + /* If we are here then at one time it must have been was + * available but not visible to us, implying that it was an + * entry written under a transaction and is interesting to + * us, but not yet visible. We need to add it lockedEntries + * even if has been removed since it could have gotten + * replaced before it was removed. If we did not + * add it we would be acting on the future. + */ + lockedEntries.add(rep.id()); + } else { + /* Must not be available, transition must mark + * the resolution of the transaction in such away + * that the entry has been removed, remove it + * from the set and see if that makes the set empty. + */ + lockedEntries.remove(rep.id()); + if (backlogFinished && lockedEntries.isEmpty()) + resolve(null, null); + } + } + + synchronized boolean catchUp(EntryTransition transition, long now) { + if (isResolved()) + return true; // Already done. + + final EntryHandle handle = transition.getHandle(); + final EntryRep rep = handle.rep(); + final boolean isAvailable = transition.isAvailable(); + final TransactableMgr transitionTxn = transition.getTxn(); + + /* If it at one time it was available to our transaction + * it may still be, try to get it. + */ + if (isAvailable && + ((null == transitionTxn) || txn == transitionTxn)) { + /* Is it still available? Try to get it. attemptCapture will + * add the entry to lockedEntries for us if we could not + * get it and it is still in the space (but locked). + * Nothing will be added if it has been removed outright. + * This is ok even though we are peaking into the future - + * we won't act on that information until the future + * comes to pass. + */ + if (getServer().attemptCapture(handle, txn, true, + lockedEntries, provisionallyRemovedEntrySet, now, this)) + { + // Got it + resolve(handle, null); + return true; + } + + // did not resolve + return false; + } + + if (isAvailable) { // but it is not visible to txn + /* If we are here then at one time it must have been was + * available but not visible to us, implying that it was + * an entry written under a transaction and is interesting + * to us, but not yet visible. We only add if it has not + * already been removed. It might have gotten replaced + * before removal, but since we won't let this query get + * resolved with a definitive null before we get past the + * point in the journal where it was removed it is ok to + * never put it in (and if we did put it in it might never + * get removed since process() may have already processed + * the removal). We don't need to check to see it the + * entry has been provisionally removed since if has been + * provisional removal does not put in entries in the + * journal and if it is provisionally removed it has not + * yet been removed so the remove recored has not yet been + * created (must less processed). + */ + synchronized (handle) { + if (!handle.removed()) { + lockedEntries.add(rep.id()); + } + + /* If it has been removed, there is no way it + * will be interesting to us again, ever. + */ + } + // Either way, still not resolved. + return false; + } + + /* Must not be available, transition must mark + * the resolution of the transaction in such away + * that the entry has been removed, remove it + * from the set (don't need to check for empty + * because we haven't gotten to the point + * where we can resolve with a definitive null.) + */ + lockedEntries.remove(rep.id()); + return false; + } + + /** + * Once the backlog is complete we can resolve if + * lockedEntries is/becomes empty. + */ + public synchronized void caughtUp() { + backlogFinished = true; + + if (isResolved()) + return; // Don't much mater. + + if (lockedEntries.isEmpty()) + resolve(null, null); + } + + public synchronized boolean isLockedEntrySetEmpty() { + if (!isResolved()) + throw new IllegalStateException("Query not yet resolved"); + return lockedEntries.isEmpty(); + } + + /** + * If a transaction ends in the middle of a query we want + * to throw an exception to the client making the query + * not the <code>Txn</code> calling us here.) + */ + public synchronized int prepare(TransactableMgr mgr, + OutriggerServerImpl space) + { + assert txn != null:"Transactable method called on a " + + "non-transactional TakeIfExistsWatcher"; + + // only throw an exception if we are not resolved. + if (!isResolved()) { + // Query still in progress, kill it + resolve(null, new TransactionException("completed while " + + "operation in progress")); + } + + // If this object has made changes they have been recorded elsewhere + return NOTCHANGED; + } + + /** + * This should never happen since we always return + * <code>NOTCHANGED</code> from <code>prepare</code>. + */ + public void commit(TransactableMgr mgr, OutriggerServerImpl space) { + throw new InternalSpaceException("committing a blocking query"); + + } + + /** + * If a transaction ends in the middle of a query we want + * to throw an exception to the client making the query + * (not the <code>Txn</code> calling us here.) + */ + public void abort(TransactableMgr mgr, OutriggerServerImpl space) { + // prepare does the right thing, and should forever + prepare(mgr, space); + } +}
Modified: river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TemplateHandle.java URL: http://svn.apache.org/viewvc/river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TemplateHandle.java?rev=1879521&r1=1879520&r2=1879521&view=diff ============================================================================== --- river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TemplateHandle.java (original) +++ river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TemplateHandle.java Sun Jul 5 11:41:39 2020 @@ -1,232 +1,234 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.river.outrigger; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.Set; -import java.util.Iterator; -import java.util.List; -import java.util.Queue; -import java.util.Vector; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; -import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; - -/** - * <code>TemplateHandle</code> associates one or more - * <code>TransitionWatcher</code>s with a template. - * Unless otherwise noted all methods are thread safe. - */ -class TemplateHandle extends BaseHandle { - - /** - * The watchers. We use a <code>HashSet</code> because we will - * probably do a fair number of removals for each traversal and - * the number of watchers managed by one <code>TemplateHandle</code> - * will probably never get very large. If this does become an - * issue making <code>TransitionWatcher</code> extend - * <code>FastList.Node</code> and using a <code>FastList</code> - * here would probably be a good choice (though that would require - * changing <code>FastList</code> to support overlapping traversals - * of different lists from the same thread.) - */ - final private Set<TransitionWatcher> watchers - = Collections.newSetFromMap( - new ConcurrentHashMap<TransitionWatcher,Boolean>()); - /** - * WriteLock guarantees that no updates can be performed during a - * removal operation. - */ - final private WriteLock wl; - final private ReadLock rl; - private boolean removed; // mutate with wl, read with rl - - /** - * The <code>WatchersForTemplateClass</code> this object - * belongs to. - */ - final private OutriggerServerImpl owner; - - /** - * Create a handle for the template <code>tmpl</code>. - */ - TemplateHandle(EntryRep tmpl, OutriggerServerImpl owner, Queue<TemplateHandle> content) { - super(tmpl, content); - this.owner = owner; - ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); - wl = rwl.writeLock(); - rl = rwl.readLock(); - } - - /** - * Return the description for the given field count. - */ - EntryHandleTmplDesc descFor(int index) { - return EntryHandle.descFor(rep(), index); - } - - /** - * Return <code>true</code> if this template matches the given entry. - */ - boolean matches(EntryRep entry) { - return rep().matches(entry); - } - - /** - * Add a watcher to this handle. Assumes that the handle has not - * been removed. - * @param watcher the watcher to be added. - * @return true if watcher is added, false otherwise. - * @throws NullPointerException if watcher is <code>null</code>. - */ - boolean addTransitionWatcher(TransitionWatcher watcher) { - if (watcher == null) - throw new NullPointerException("Watcher can not be null"); - rl.lock(); - try { - if (removed) return false; - if (watcher.addTemplateHandle(this)) { - return watchers.add(watcher); - } - return false; - } finally { - rl.unlock(); - } - } - - /** - * Remote a watcher from this handle. Does nothing - * if the specified watcher is not associated with - * this <code>TemplateHandle</code>. - * @param watcher the watcher to be removed. - * @throws NullPointerException if watcher is <code>null</code>. - */ - void removeTransitionWatcher(TransitionWatcher watcher) { - if (watcher == null) - throw new NullPointerException("Watcher can not be null"); - rl.lock(); - try { - watchers.remove(watcher); - } finally { - rl.unlock(); - } - } - - /** - * Iterate over the watchers associated with - * this handle calling <code>isInterested</code> on each - * and if it returns <code>true</code> adding the watcher to the - * passed set. - * - * @param set The set to accumulate interested watchers - * into. - * @param transition The transition being processed. - * @param ordinal The ordinal associated with <code>transition</code>. - * @throws NullPointerException if either argument is <code>null</code>. - */ - void collectInterested(Set<TransitionWatcher> set, EntryTransition transition, - long ordinal) - { - rl.lock(); - try { - if (removed) return; - final Iterator i = watchers.iterator(); - while (i.hasNext()) { - final TransitionWatcher w = (TransitionWatcher)i.next(); - if (w.isInterested(transition, ordinal)) { - set.add(w); - } - } - } finally { - rl.unlock(); - } - } - - /** - * Return the <code>OutriggerServerImpl</code> this - * handle is part of. - * @return The <code>OutriggerServerImpl</code> this - * handle is part of. - */ - OutriggerServerImpl getServer() { - return owner; - } - - /** - * Visit each <code>TransitionWatcher</code> and check to see if - * it has expired, removing it if it has. - * @param now an estimate of the current time expressed as - * milliseconds since the beginning of the epoch. - */ - void reap(long now) { - rl.lock(); - try{ - Iterator<TransitionWatcher> it = watchers.iterator(); - while (it.hasNext()){ - it.next().removeIfExpired(now); - } - } finally { - rl.unlock(); - } - } - - - /** - * Need to lock on the wl so no one will - * add a watcher between the check for empty and - * when it gets removed. - */ - boolean removeIfEmpty(){ - wl.lock(); - try { - if (watchers.isEmpty()) { - return remove(); - } - return false; - } finally { - wl.unlock(); - } - } - - @Override - public boolean removed() { - rl.lock(); - try { - return removed; - } finally { - rl.unlock(); - } - } - - @Override - public boolean remove() { - wl.lock(); - try { - if (removed){ - return false; // already removed. - } else { - removed = super.remove(); - return removed; - } - } finally { - wl.unlock(); - } - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.river.outrigger; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Set; +import java.util.Iterator; +import java.util.List; +import java.util.Queue; +import java.util.Vector; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; +import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; +import org.apache.river.outrigger.proxy.EntryRep; + + +/** + * <code>TemplateHandle</code> associates one or more + * <code>TransitionWatcher</code>s with a template. + * Unless otherwise noted all methods are thread safe. + */ +class TemplateHandle extends BaseHandle { + + /** + * The watchers. We use a <code>HashSet</code> because we will + * probably do a fair number of removals for each traversal and + * the number of watchers managed by one <code>TemplateHandle</code> + * will probably never get very large. If this does become an + * issue making <code>TransitionWatcher</code> extend + * <code>FastList.Node</code> and using a <code>FastList</code> + * here would probably be a good choice (though that would require + * changing <code>FastList</code> to support overlapping traversals + * of different lists from the same thread.) + */ + final private Set<TransitionWatcher> watchers + = Collections.newSetFromMap( + new ConcurrentHashMap<TransitionWatcher,Boolean>()); + /** + * WriteLock guarantees that no updates can be performed during a + * removal operation. + */ + final private WriteLock wl; + final private ReadLock rl; + private boolean removed; // mutate with wl, read with rl + + /** + * The <code>WatchersForTemplateClass</code> this object + * belongs to. + */ + final private OutriggerServerImpl owner; + + /** + * Create a handle for the template <code>tmpl</code>. + */ + TemplateHandle(EntryRep tmpl, OutriggerServerImpl owner, Queue<TemplateHandle> content) { + super(tmpl, content); + this.owner = owner; + ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); + wl = rwl.writeLock(); + rl = rwl.readLock(); + } + + /** + * Return the description for the given field count. + */ + EntryHandleTmplDesc descFor(int index) { + return EntryHandle.descFor(rep(), index); + } + + /** + * Return <code>true</code> if this template matches the given entry. + */ + boolean matches(EntryRep entry) { + return rep().matches(entry); + } + + /** + * Add a watcher to this handle. Assumes that the handle has not + * been removed. + * @param watcher the watcher to be added. + * @return true if watcher is added, false otherwise. + * @throws NullPointerException if watcher is <code>null</code>. + */ + boolean addTransitionWatcher(TransitionWatcher watcher) { + if (watcher == null) + throw new NullPointerException("Watcher can not be null"); + rl.lock(); + try { + if (removed) return false; + if (watcher.addTemplateHandle(this)) { + return watchers.add(watcher); + } + return false; + } finally { + rl.unlock(); + } + } + + /** + * Remote a watcher from this handle. Does nothing + * if the specified watcher is not associated with + * this <code>TemplateHandle</code>. + * @param watcher the watcher to be removed. + * @throws NullPointerException if watcher is <code>null</code>. + */ + void removeTransitionWatcher(TransitionWatcher watcher) { + if (watcher == null) + throw new NullPointerException("Watcher can not be null"); + rl.lock(); + try { + watchers.remove(watcher); + } finally { + rl.unlock(); + } + } + + /** + * Iterate over the watchers associated with + * this handle calling <code>isInterested</code> on each + * and if it returns <code>true</code> adding the watcher to the + * passed set. + * + * @param set The set to accumulate interested watchers + * into. + * @param transition The transition being processed. + * @param ordinal The ordinal associated with <code>transition</code>. + * @throws NullPointerException if either argument is <code>null</code>. + */ + void collectInterested(Set<TransitionWatcher> set, EntryTransition transition, + long ordinal) + { + rl.lock(); + try { + if (removed) return; + final Iterator i = watchers.iterator(); + while (i.hasNext()) { + final TransitionWatcher w = (TransitionWatcher)i.next(); + if (w.isInterested(transition, ordinal)) { + set.add(w); + } + } + } finally { + rl.unlock(); + } + } + + /** + * Return the <code>OutriggerServerImpl</code> this + * handle is part of. + * @return The <code>OutriggerServerImpl</code> this + * handle is part of. + */ + OutriggerServerImpl getServer() { + return owner; + } + + /** + * Visit each <code>TransitionWatcher</code> and check to see if + * it has expired, removing it if it has. + * @param now an estimate of the current time expressed as + * milliseconds since the beginning of the epoch. + */ + void reap(long now) { + rl.lock(); + try{ + Iterator<TransitionWatcher> it = watchers.iterator(); + while (it.hasNext()){ + it.next().removeIfExpired(now); + } + } finally { + rl.unlock(); + } + } + + + /** + * Need to lock on the wl so no one will + * add a watcher between the check for empty and + * when it gets removed. + */ + boolean removeIfEmpty(){ + wl.lock(); + try { + if (watchers.isEmpty()) { + return remove(); + } + return false; + } finally { + wl.unlock(); + } + } + + @Override + public boolean removed() { + rl.lock(); + try { + return removed; + } finally { + rl.unlock(); + } + } + + @Override + public boolean remove() { + wl.lock(); + try { + if (removed){ + return false; // already removed. + } else { + removed = super.remove(); + return removed; + } + } finally { + wl.unlock(); + } + } +} Modified: river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TransactableReadIfExistsWatcher.java URL: http://svn.apache.org/viewvc/river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TransactableReadIfExistsWatcher.java?rev=1879521&r1=1879520&r2=1879521&view=diff ============================================================================== --- river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TransactableReadIfExistsWatcher.java (original) +++ river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TransactableReadIfExistsWatcher.java Sun Jul 5 11:41:39 2020 @@ -1,323 +1,326 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.river.outrigger; - -import java.util.Map; -import java.util.Set; -import java.util.WeakHashMap; -import net.jini.core.transaction.TransactionException; -import net.jini.id.Uuid; -import net.jini.space.InternalSpaceException; - -/** - * Subclass of <code>QueryWatcher</code> for and transactional - * <code>readIfExists</code> queries. Resolves with the first - * matching transition where the entry is visible to the associated - * transaction and the entry is still available, or of the locked - * entry set goes empty. - */ -class TransactableReadIfExistsWatcher extends SingletonQueryWatcher - implements IfExistsWatcher, Transactable -{ - /** - * The set of entries that would match but are currently - * unavailable (e.g. they are locked). We only keep - * the ids, not the entries themselves. - */ - private final Set<Uuid> lockedEntries; - - /** - * Set <code>true</code> once the query thread is - * done processing the backlog. Once this is - * <code>true</code> it is ok to resolve if - * <code>lockedEntries</code> is empty. - */ - private boolean backlogFinished = false; - - /** - * The transaction this query is - * being performed under. - */ - private final Txn txn; - - /** - * Set of entries (represented by <code>EntryHolder</code>s) that - * we would have liked to return, but have been provisionally - * removed. - */ - private final Set<EntryHandle> provisionallyRemovedEntrySet; - - /** - * Create a new <code>TransactableReadIfExistsWatcher</code>. - * @param expiration the initial expiration time - * for this <code>TransitionWatcher</code> in - * milliseconds since the beginning of the epoch. - * @param timestamp the value that is used - * to sort <code>TransitionWatcher</code>s. - * @param startOrdinal the highest ordinal associated - * with operations that are considered to have occurred - * before the operation associated with this watcher. - * @param lockedEntries Set of entries (by their IDs) - * that match but are unavailable. Must be non-empty. - * Keeps a reference to this object. - * @param provisionallyRemovedEntrySet If the watcher encounters - * an entry that can not be read/taken because it has been - * provisionally removed then its handle will be placed in - * this <code>WeakHashMap</code> as a key (with null as the - * value). May be <code>null</code> in which case - * provisionally removed entries will not be - * recorded. Ensures that object is only accessed by one - * thread at a time - * @param txn If the query is being performed under - * a transaction the <code>Txn</code> object - * associated with that transaction. - * @throws NullPointerException if <code>lockedEntries</code> or - * <code>txn</code> is <code>null</code>. - */ - TransactableReadIfExistsWatcher(long expiration, long timestamp, - long startOrdinal, Set<Uuid> lockedEntries, - Set<EntryHandle> provisionallyRemovedEntrySet, Txn txn) - { - super(expiration, timestamp, startOrdinal); - - if (lockedEntries == null) - throw new NullPointerException("lockedEntries must be non-null"); - - if (txn == null) - throw new NullPointerException("txn must be non-null"); - - this.lockedEntries = lockedEntries; - this.txn = txn; - this.provisionallyRemovedEntrySet = provisionallyRemovedEntrySet; - } - - boolean isInterested(EntryTransition transition, long ordinal) { - /* If we are unresolved pretty much all transitions are - * interesting because we may need to update - * lockedEntries. The only exception is read locks being - * resolved. It is important that transitions triggered by the - * release of read locks get filtered out - otherwise process - * could end up adding and removing elements to lockedEntries - * when it shouldn't. - * - * Note, !isResolved() without the lock will result only in - * false positives, not false negatives - it will only - * cause isInterested() to return false if we are resolved, - * we may still return true if we are resolved though. - */ - if (!transition.isVisible() && transition.isAvailable()) { - /* must be a transition triggered by a read lock release, - * wont change anything so ignore it. - */ - return false; - } - - return (ordinal>startOrdinal) && !isResolved(); - } - - synchronized void process(EntryTransition transition, long now) { - if (isResolved()) - return; // Already done. - - final EntryHandle handle = transition.getHandle(); - final EntryRep rep = handle.rep(); - final boolean isVisible = transition.isVisible(); - final TransactableMgr transitionTxn = transition.getTxn(); - - /* If it at one time it was available to our transaction - * it may still be, try to get it. - */ - if (isVisible && - ((null == transitionTxn) || txn == transitionTxn)) { - /* Is it still available? */ - if (getServer().attemptCapture(handle, txn, false, null, - provisionallyRemovedEntrySet, now, this)) - { - // Got it - resolve(handle, null); - } else { - /* Must not have been able to get it. Either - * locked under a conflicting lock, in which - * case it should be in our lockedEntries set, or - * it has been removed, in which it still needs - * to be in our lockedEntries since it may have - * been replaced before being removed. - */ - lockedEntries.add(rep.id()); - } - } else if (isVisible) { // but it is not visible to txn - /* If we are here then at one time it must have been was - * visible but not visible to us, implying that it was an - * entry written under a transaction and is interesting to - * us, but not yet visible. We need to add it lockedEntries - * even if has been removed since it could have gotten - * replaced before it was removed. If we did not - * add it we would be acting on the future. - */ - lockedEntries.add(rep.id()); - } else { - /* Must not be available, transition must mark - * the resolution of the transaction in such away - * that the entry has been removed, remove it - * from the set and see if that makes the set empty. - */ - lockedEntries.remove(rep.id()); - if (backlogFinished && lockedEntries.isEmpty()) - resolve(null, null); - } - } - - synchronized boolean catchUp(EntryTransition transition, long now) { - if (isResolved()) - return true; // Already done. - - final EntryHandle handle = transition.getHandle(); - final EntryRep rep = handle.rep(); - final boolean isVisible = transition.isVisible(); - final TransactableMgr transitionTxn = transition.getTxn(); - - /* Was this the resolution of a read lock? if so ignore */ - if (!isVisible && transition.isAvailable()) - return false; - - /* If it at one time it was available to our transaction - * it may still be, try to get it. - */ - if (isVisible && - ((null == transitionTxn) || txn == transitionTxn)) { - /* Is it still visible? Try to get it. attemptCapture will - * add the entry to lockedEntries for us if we could not - * get it and it is still in the space (but locked). - * Nothing will be added if it has been removed outright. - * This is ok even though we are peaking into the future - - * we won't act on that information until the future - * comes to pass. - */ - if (getServer().attemptCapture(handle, txn, false, - lockedEntries, provisionallyRemovedEntrySet, now, this)) - { - // Got it - resolve(handle, null); - return true; - } - - // did not resolve - return false; - } - - if (isVisible) { // but it is not visible to txn - /* If we are here then at one time it must have been was - * visible but not visible to us, implying that it was - * an entry written under a transaction and is interesting - * to us, but not yet visible. We only add if it has not - * already been removed. It might have gotten replaced - * before removal, but since we won't let this query get - * resolved with a definitive null before we get past the - * point in the journal where it was removed it is ok to - * never put it in (and if we did put it in it might never - * get removed since process() may have already processed - * the removal). We don't need to check to see it the - * entry has been provisionally removed since if has been - * provisional removal does not put in entries in the - * journal and if it is provisionally removed it has not - * yet been removed so the remove recored has not yet been - * created (must less processed). - */ - synchronized (handle) { - if (!handle.removed()) { - lockedEntries.add(rep.id()); - } - - /* If it has been removed, there is no way it - * will be interesting to us again, ever. - */ - } - // Either way, still not resolved. - return false; - } - - /* Must not be visible (and because of the first test can't be - * available either - that is transition can't just be the - * release of a read lock), transition must mark the - * resolution of the transaction in such away that the entry - * has been removed, remove it from the set (don't need to - * check for empty because we haven't gotten to the point - * where we can resolve with a definitive null.) - */ - lockedEntries.remove(rep.id()); - return false; - } - - /** - * Once the backlog is complete we can resolve if - * lockedEntries is/becomes empty. - */ - public synchronized void caughtUp() { - backlogFinished = true; - - if (isResolved()) - return; // Don't much mater. - - if (lockedEntries.isEmpty()) - resolve(null, null); - } - - public synchronized boolean isLockedEntrySetEmpty() { - if (!isResolved()) - throw new IllegalStateException("Query not yet resolved"); - return lockedEntries.isEmpty(); - } - - /** - * If a transaction ends in the middle of a query we want - * to throw an exception to the client making the query - * not the <code>Txn</code> calling us here.) - */ - public synchronized int prepare(TransactableMgr mgr, - OutriggerServerImpl space) - { - // only throw an exception if we are not resolved. - if (!isResolved()) { - // Query still in progress, kill it - resolve(null, new TransactionException("completed while " + - "operation in progress")); - } - - // If this object has made changes they have been recorded elsewhere - return NOTCHANGED; - } - - /** - * This should never happen since we always return - * <code>NOTCHANGED</code> from <code>prepare</code>. - */ - public void commit(TransactableMgr mgr, OutriggerServerImpl space) { - throw new InternalSpaceException("committing a blocking query"); - - } - - /** - * If a transaction ends in the middle of a query we want - * to throw an exception to the client making the query - * (not the <code>Txn</code> calling us here.) - */ - public void abort(TransactableMgr mgr, OutriggerServerImpl space) { - // prepare does the right thing, and should forever - prepare(mgr, space); - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.river.outrigger; + +import java.util.Map; +import java.util.Set; +import java.util.WeakHashMap; +import net.jini.core.transaction.TransactionException; +import net.jini.id.Uuid; +import net.jini.space.InternalSpaceException; +import org.apache.river.outrigger.proxy.EntryRep; + + + +/** + * Subclass of <code>QueryWatcher</code> for and transactional + * <code>readIfExists</code> queries. Resolves with the first + * matching transition where the entry is visible to the associated + * transaction and the entry is still available, or of the locked + * entry set goes empty. + */ +class TransactableReadIfExistsWatcher extends SingletonQueryWatcher + implements IfExistsWatcher, Transactable +{ + /** + * The set of entries that would match but are currently + * unavailable (e.g. they are locked). We only keep + * the ids, not the entries themselves. + */ + private final Set<Uuid> lockedEntries; + + /** + * Set <code>true</code> once the query thread is + * done processing the backlog. Once this is + * <code>true</code> it is ok to resolve if + * <code>lockedEntries</code> is empty. + */ + private boolean backlogFinished = false; + + /** + * The transaction this query is + * being performed under. + */ + private final Txn txn; + + /** + * Set of entries (represented by <code>EntryHolder</code>s) that + * we would have liked to return, but have been provisionally + * removed. + */ + private final Set<EntryHandle> provisionallyRemovedEntrySet; + + /** + * Create a new <code>TransactableReadIfExistsWatcher</code>. + * @param expiration the initial expiration time + * for this <code>TransitionWatcher</code> in + * milliseconds since the beginning of the epoch. + * @param timestamp the value that is used + * to sort <code>TransitionWatcher</code>s. + * @param startOrdinal the highest ordinal associated + * with operations that are considered to have occurred + * before the operation associated with this watcher. + * @param lockedEntries Set of entries (by their IDs) + * that match but are unavailable. Must be non-empty. + * Keeps a reference to this object. + * @param provisionallyRemovedEntrySet If the watcher encounters + * an entry that can not be read/taken because it has been + * provisionally removed then its handle will be placed in + * this <code>WeakHashMap</code> as a key (with null as the + * value). May be <code>null</code> in which case + * provisionally removed entries will not be + * recorded. Ensures that object is only accessed by one + * thread at a time + * @param txn If the query is being performed under + * a transaction the <code>Txn</code> object + * associated with that transaction. + * @throws NullPointerException if <code>lockedEntries</code> or + * <code>txn</code> is <code>null</code>. + */ + TransactableReadIfExistsWatcher(long expiration, long timestamp, + long startOrdinal, Set<Uuid> lockedEntries, + Set<EntryHandle> provisionallyRemovedEntrySet, Txn txn) + { + super(expiration, timestamp, startOrdinal); + + if (lockedEntries == null) + throw new NullPointerException("lockedEntries must be non-null"); + + if (txn == null) + throw new NullPointerException("txn must be non-null"); + + this.lockedEntries = lockedEntries; + this.txn = txn; + this.provisionallyRemovedEntrySet = provisionallyRemovedEntrySet; + } + + boolean isInterested(EntryTransition transition, long ordinal) { + /* If we are unresolved pretty much all transitions are + * interesting because we may need to update + * lockedEntries. The only exception is read locks being + * resolved. It is important that transitions triggered by the + * release of read locks get filtered out - otherwise process + * could end up adding and removing elements to lockedEntries + * when it shouldn't. + * + * Note, !isResolved() without the lock will result only in + * false positives, not false negatives - it will only + * cause isInterested() to return false if we are resolved, + * we may still return true if we are resolved though. + */ + if (!transition.isVisible() && transition.isAvailable()) { + /* must be a transition triggered by a read lock release, + * wont change anything so ignore it. + */ + return false; + } + + return (ordinal>startOrdinal) && !isResolved(); + } + + synchronized void process(EntryTransition transition, long now) { + if (isResolved()) + return; // Already done. + + final EntryHandle handle = transition.getHandle(); + final EntryRep rep = handle.rep(); + final boolean isVisible = transition.isVisible(); + final TransactableMgr transitionTxn = transition.getTxn(); + + /* If it at one time it was available to our transaction + * it may still be, try to get it. + */ + if (isVisible && + ((null == transitionTxn) || txn == transitionTxn)) { + /* Is it still available? */ + if (getServer().attemptCapture(handle, txn, false, null, + provisionallyRemovedEntrySet, now, this)) + { + // Got it + resolve(handle, null); + } else { + /* Must not have been able to get it. Either + * locked under a conflicting lock, in which + * case it should be in our lockedEntries set, or + * it has been removed, in which it still needs + * to be in our lockedEntries since it may have + * been replaced before being removed. + */ + lockedEntries.add(rep.id()); + } + } else if (isVisible) { // but it is not visible to txn + /* If we are here then at one time it must have been was + * visible but not visible to us, implying that it was an + * entry written under a transaction and is interesting to + * us, but not yet visible. We need to add it lockedEntries + * even if has been removed since it could have gotten + * replaced before it was removed. If we did not + * add it we would be acting on the future. + */ + lockedEntries.add(rep.id()); + } else { + /* Must not be available, transition must mark + * the resolution of the transaction in such away + * that the entry has been removed, remove it + * from the set and see if that makes the set empty. + */ + lockedEntries.remove(rep.id()); + if (backlogFinished && lockedEntries.isEmpty()) + resolve(null, null); + } + } + + synchronized boolean catchUp(EntryTransition transition, long now) { + if (isResolved()) + return true; // Already done. + + final EntryHandle handle = transition.getHandle(); + final EntryRep rep = handle.rep(); + final boolean isVisible = transition.isVisible(); + final TransactableMgr transitionTxn = transition.getTxn(); + + /* Was this the resolution of a read lock? if so ignore */ + if (!isVisible && transition.isAvailable()) + return false; + + /* If it at one time it was available to our transaction + * it may still be, try to get it. + */ + if (isVisible && + ((null == transitionTxn) || txn == transitionTxn)) { + /* Is it still visible? Try to get it. attemptCapture will + * add the entry to lockedEntries for us if we could not + * get it and it is still in the space (but locked). + * Nothing will be added if it has been removed outright. + * This is ok even though we are peaking into the future - + * we won't act on that information until the future + * comes to pass. + */ + if (getServer().attemptCapture(handle, txn, false, + lockedEntries, provisionallyRemovedEntrySet, now, this)) + { + // Got it + resolve(handle, null); + return true; + } + + // did not resolve + return false; + } + + if (isVisible) { // but it is not visible to txn + /* If we are here then at one time it must have been was + * visible but not visible to us, implying that it was + * an entry written under a transaction and is interesting + * to us, but not yet visible. We only add if it has not + * already been removed. It might have gotten replaced + * before removal, but since we won't let this query get + * resolved with a definitive null before we get past the + * point in the journal where it was removed it is ok to + * never put it in (and if we did put it in it might never + * get removed since process() may have already processed + * the removal). We don't need to check to see it the + * entry has been provisionally removed since if has been + * provisional removal does not put in entries in the + * journal and if it is provisionally removed it has not + * yet been removed so the remove recored has not yet been + * created (must less processed). + */ + synchronized (handle) { + if (!handle.removed()) { + lockedEntries.add(rep.id()); + } + + /* If it has been removed, there is no way it + * will be interesting to us again, ever. + */ + } + // Either way, still not resolved. + return false; + } + + /* Must not be visible (and because of the first test can't be + * available either - that is transition can't just be the + * release of a read lock), transition must mark the + * resolution of the transaction in such away that the entry + * has been removed, remove it from the set (don't need to + * check for empty because we haven't gotten to the point + * where we can resolve with a definitive null.) + */ + lockedEntries.remove(rep.id()); + return false; + } + + /** + * Once the backlog is complete we can resolve if + * lockedEntries is/becomes empty. + */ + public synchronized void caughtUp() { + backlogFinished = true; + + if (isResolved()) + return; // Don't much mater. + + if (lockedEntries.isEmpty()) + resolve(null, null); + } + + public synchronized boolean isLockedEntrySetEmpty() { + if (!isResolved()) + throw new IllegalStateException("Query not yet resolved"); + return lockedEntries.isEmpty(); + } + + /** + * If a transaction ends in the middle of a query we want + * to throw an exception to the client making the query + * not the <code>Txn</code> calling us here.) + */ + public synchronized int prepare(TransactableMgr mgr, + OutriggerServerImpl space) + { + // only throw an exception if we are not resolved. + if (!isResolved()) { + // Query still in progress, kill it + resolve(null, new TransactionException("completed while " + + "operation in progress")); + } + + // If this object has made changes they have been recorded elsewhere + return NOTCHANGED; + } + + /** + * This should never happen since we always return + * <code>NOTCHANGED</code> from <code>prepare</code>. + */ + public void commit(TransactableMgr mgr, OutriggerServerImpl space) { + throw new InternalSpaceException("committing a blocking query"); + + } + + /** + * If a transaction ends in the middle of a query we want + * to throw an exception to the client making the query + * (not the <code>Txn</code> calling us here.) + */ + public void abort(TransactableMgr mgr, OutriggerServerImpl space) { + // prepare does the right thing, and should forever + prepare(mgr, space); + } +} Modified: river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TransientOutriggerImpl.java URL: http://svn.apache.org/viewvc/river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TransientOutriggerImpl.java?rev=1879521&r1=1879520&r2=1879521&view=diff ============================================================================== --- river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TransientOutriggerImpl.java (original) +++ river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TransientOutriggerImpl.java Sun Jul 5 11:41:39 2020 @@ -1,53 +1,53 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.river.outrigger; - -import java.io.IOException; -import javax.security.auth.login.LoginException; -import net.jini.config.ConfigurationException; -import org.apache.river.start.LifeCycle; - -/** - * <code>OutriggerServerWrapper</code> subclass for - * transient servers. - * - * @author Sun Microsystems, Inc. - * @since 2.0 - */ -class TransientOutriggerImpl extends OutriggerServerWrapper { - /** - * Create a new transient outrigger server. - * @param configArgs set of strings to be used to obtain a - * <code>Configuration</code>. - * @param lifeCycle the object to notify when this - * service is destroyed. - * @throws IOException if there is problem exporting the server. - * @throws ConfigurationException if the <code>Configuration</code> is - * malformed. - * @throws LoginException if the <code>loginContext</code> specified - * in the configuration is non-null and throws - * an exception when login is attempted. - */ - TransientOutriggerImpl(String[] configArgs, LifeCycle lifeCycle) - throws IOException, ConfigurationException, LoginException - { - super(configArgs, lifeCycle, false); - allowCalls(); - } -} - +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.river.outrigger; + +import java.io.IOException; +import javax.security.auth.login.LoginException; +import net.jini.config.ConfigurationException; +import org.apache.river.start.lifecycle.LifeCycle; + +/** + * <code>OutriggerServerWrapper</code> subclass for + * transient servers. + * + * @author Sun Microsystems, Inc. + * @since 2.0 + */ +class TransientOutriggerImpl extends OutriggerServerWrapper { + /** + * Create a new transient outrigger server. + * @param configArgs set of strings to be used to obtain a + * <code>Configuration</code>. + * @param lifeCycle the object to notify when this + * service is destroyed. + * @throws IOException if there is problem exporting the server. + * @throws ConfigurationException if the <code>Configuration</code> is + * malformed. + * @throws LoginException if the <code>loginContext</code> specified + * in the configuration is non-null and throws + * an exception when login is attempted. + */ + TransientOutriggerImpl(String[] configArgs, LifeCycle lifeCycle) + throws IOException, ConfigurationException, LoginException + { + super(configArgs, lifeCycle, false); + allowCalls(); + } +} + Modified: river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TransitionWatchers.java URL: http://svn.apache.org/viewvc/river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TransitionWatchers.java?rev=1879521&r1=1879520&r2=1879521&view=diff ============================================================================== --- river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TransitionWatchers.java (original) +++ river/jtsk/modules/modularize/apache-river/river-services/outrigger/outrigger-service/src/main/java/org/apache/river/outrigger/TransitionWatchers.java Sun Jul 5 11:41:39 2020 @@ -1,183 +1,185 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.river.outrigger; - -import java.util.Map; -import java.util.Collection; -import java.util.Iterator; -import java.util.SortedSet; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -/** - * Given an <code>EntryHandle</code> who's entry is making a - * visibility transition this class will find all the - * <code>TransitionWatcher</code>s who are interested in that - * transition. The <code>TransitionWatcher</code>s are organized - * into groups using <code>TemplateHandle</code>. Each - * <code>TemplateHandle</code> aggregates a number of watchers - * all interested in the same template. - * - * @see TransitionWatcher - * @author Sun Microsystems, Inc. - */ -class TransitionWatchers { - /** - * A map from class names to <code>WatchersForTemplateClass</code> - * objects - */ - final private ConcurrentMap<String,WatchersForTemplateClass> holders - = new ConcurrentHashMap<String,WatchersForTemplateClass>(); - - /** The server we are working for */ - final private OutriggerServerImpl server; - - /** - * Create a new <code>TransitionWatchers</code> object - * for the specified server. - * @param server The server the new <code>TransitionWatchers</code> - * object is working for. - * @throws NullPointerException if <code>server</code> is - * <code>null</code> - */ - TransitionWatchers(OutriggerServerImpl server) { - this(check(server), server); - } - - private static boolean check(OutriggerServerImpl server) throws NullPointerException { - if (server == null) - throw new NullPointerException("server must be non-null"); - return true; - } - - private TransitionWatchers(boolean checked, OutriggerServerImpl server){ - this.server = server; - } - /** - * Add a <code>TransitionWatcher</code> to the list - * of watchers looking for visibility transitions in - * entries that match the specified template. Associates - * a <code>TemplateHandle</code> using - * <code>TransitionWatcher.setTemplateHandle</code> method. - * <p> - * This method is thread safe. The watcher added in this call is - * guaranteed to be consulted by the next call to - * <code>allMatches</code> that starts after this call completes even - * if that call is made from another thread. Also, all of - * of the assigned values in the calling thread's working - * memory will be copied out to main memory as part of the - * process of making the passed watcher visible to future - * <code>allMatches</code> and <code>findTransitionWatcher</code> calls. - * - * @param watcher The <code>TransitionWatcher</code> being added. - * @param template The <code>EntryRep</code> that represents - * the template of interest. - * @throws NullPointerException if either argument is - * <code>null</code>. - */ - void add(TransitionWatcher watcher, EntryRep template) { - // Get/create the appropriate WatchersForTemplateClass - final String className = template.classFor(); - WatchersForTemplateClass holder = holders.get(className); - if (holder == null) { - holder = new WatchersForTemplateClass(server); - WatchersForTemplateClass existed = holders.putIfAbsent(className, holder); - if (existed != null) holder = existed; - } - // Add the watcher to the WatchersForTemplateClass - holder.add(watcher, template); - } - - /** - * Return a <code>SortedSet</code> of all the - * <code>TransitionWatcher</code> who's <code>isInterested</code> - * methods return <code>true</code> when asked about the specified - * visibility transition. - * <p> - * This method is thread safe. This call is guaranteed to check unremoved - * watchers that were added by <code>add</code> calls that completed - * before this call started, even if the calls were made from - * different threads. Before the <code>isInterested</code> method - * of the first watcher is called the working memory of this thread - * will be flushed so any changes made to main memory before - * this call started will be visible. - * - * @param transition A <code>EntryTransition</code> that - * describes the transition and what - * entry is transitioning. This method - * will assume that <code>transition.getHandle</code> - * returns a non-null value. - * @param ordinal The ordinal associated with <code>transition</code>. - * @return A new <code>SortedSet</code> of all the - * <code>TransitionWatcher</code>s interested in the specified - * visibility transition. If none are interested an empty - * map will be returned. - * @throws NullPointerException if <code>transition</code> is - * <code>null</code>. - */ - SortedSet<TransitionWatcher> allMatches(EntryTransition transition, long ordinal) { - final EntryRep rep = transition.getHandle().rep(); - final SortedSet<TransitionWatcher> rslt = new java.util.TreeSet<TransitionWatcher>(); - final String className = rep.classFor(); - WatchersForTemplateClass holder; - - /* Collect all the watchers looking for the exact class of the - * transitioned entry. - */ - holder = holders.get(className); - - if (holder != null) holder.collectInterested(rslt, transition, ordinal); - - // Get all the templates that are super classes of className - final String[] superclasses = rep.superclasses(); - for (int i=0; i<superclasses.length; i++) { - holder = holders.get(superclasses[i]); - if (holder != null) - holder.collectInterested(rslt, transition, ordinal); - } - - // Including those registered for the null template - final String nullClass = EntryRep.matchAnyEntryRep().classFor(); - holder = holders.get(nullClass); - if (holder!=null) holder.collectInterested(rslt, transition, ordinal); - return rslt; - } - - /** - * Visit each <code>TransitionWatcher</code> and check to see if - * it has expired, removing it if it has. - */ - void reap() { - final long now = System.currentTimeMillis(); - Iterator<WatchersForTemplateClass> watchers = holders.values().iterator(); - while (watchers.hasNext()){ - watchers.next().reap(now); - } - } - - /** - * Return the <code>OutriggerServerImpl</code> this - * <code>TransitionWatchers</code> object is part of. - * @return The <code>OutriggerServerImpl</code> this - * <code>TransitionWatchers</code> is part of. - */ - OutriggerServerImpl getServer() { - return server; - } - -} +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.river.outrigger; + +import java.util.Map; +import java.util.Collection; +import java.util.Iterator; +import java.util.SortedSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import org.apache.river.outrigger.proxy.EntryRep; + + +/** + * Given an <code>EntryHandle</code> who's entry is making a + * visibility transition this class will find all the + * <code>TransitionWatcher</code>s who are interested in that + * transition. The <code>TransitionWatcher</code>s are organized + * into groups using <code>TemplateHandle</code>. Each + * <code>TemplateHandle</code> aggregates a number of watchers + * all interested in the same template. + * + * @see TransitionWatcher + * @author Sun Microsystems, Inc. + */ +class TransitionWatchers { + /** + * A map from class names to <code>WatchersForTemplateClass</code> + * objects + */ + final private ConcurrentMap<String,WatchersForTemplateClass> holders + = new ConcurrentHashMap<String,WatchersForTemplateClass>(); + + /** The server we are working for */ + final private OutriggerServerImpl server; + + /** + * Create a new <code>TransitionWatchers</code> object + * for the specified server. + * @param server The server the new <code>TransitionWatchers</code> + * object is working for. + * @throws NullPointerException if <code>server</code> is + * <code>null</code> + */ + TransitionWatchers(OutriggerServerImpl server) { + this(check(server), server); + } + + private static boolean check(OutriggerServerImpl server) throws NullPointerException { + if (server == null) + throw new NullPointerException("server must be non-null"); + return true; + } + + private TransitionWatchers(boolean checked, OutriggerServerImpl server){ + this.server = server; + } + /** + * Add a <code>TransitionWatcher</code> to the list + * of watchers looking for visibility transitions in + * entries that match the specified template. Associates + * a <code>TemplateHandle</code> using + * <code>TransitionWatcher.setTemplateHandle</code> method. + * <p> + * This method is thread safe. The watcher added in this call is + * guaranteed to be consulted by the next call to + * <code>allMatches</code> that starts after this call completes even + * if that call is made from another thread. Also, all of + * of the assigned values in the calling thread's working + * memory will be copied out to main memory as part of the + * process of making the passed watcher visible to future + * <code>allMatches</code> and <code>findTransitionWatcher</code> calls. + * + * @param watcher The <code>TransitionWatcher</code> being added. + * @param template The <code>EntryRep</code> that represents + * the template of interest. + * @throws NullPointerException if either argument is + * <code>null</code>. + */ + void add(TransitionWatcher watcher, EntryRep template) { + // Get/create the appropriate WatchersForTemplateClass + final String className = template.classFor(); + WatchersForTemplateClass holder = holders.get(className); + if (holder == null) { + holder = new WatchersForTemplateClass(server); + WatchersForTemplateClass existed = holders.putIfAbsent(className, holder); + if (existed != null) holder = existed; + } + // Add the watcher to the WatchersForTemplateClass + holder.add(watcher, template); + } + + /** + * Return a <code>SortedSet</code> of all the + * <code>TransitionWatcher</code> who's <code>isInterested</code> + * methods return <code>true</code> when asked about the specified + * visibility transition. + * <p> + * This method is thread safe. This call is guaranteed to check unremoved + * watchers that were added by <code>add</code> calls that completed + * before this call started, even if the calls were made from + * different threads. Before the <code>isInterested</code> method + * of the first watcher is called the working memory of this thread + * will be flushed so any changes made to main memory before + * this call started will be visible. + * + * @param transition A <code>EntryTransition</code> that + * describes the transition and what + * entry is transitioning. This method + * will assume that <code>transition.getHandle</code> + * returns a non-null value. + * @param ordinal The ordinal associated with <code>transition</code>. + * @return A new <code>SortedSet</code> of all the + * <code>TransitionWatcher</code>s interested in the specified + * visibility transition. If none are interested an empty + * map will be returned. + * @throws NullPointerException if <code>transition</code> is + * <code>null</code>. + */ + SortedSet<TransitionWatcher> allMatches(EntryTransition transition, long ordinal) { + final EntryRep rep = transition.getHandle().rep(); + final SortedSet<TransitionWatcher> rslt = new java.util.TreeSet<TransitionWatcher>(); + final String className = rep.classFor(); + WatchersForTemplateClass holder; + + /* Collect all the watchers looking for the exact class of the + * transitioned entry. + */ + holder = holders.get(className); + + if (holder != null) holder.collectInterested(rslt, transition, ordinal); + + // Get all the templates that are super classes of className + final String[] superclasses = rep.superclasses(); + for (int i=0; i<superclasses.length; i++) { + holder = holders.get(superclasses[i]); + if (holder != null) + holder.collectInterested(rslt, transition, ordinal); + } + + // Including those registered for the null template + final String nullClass = EntryRep.matchAnyEntryRep().classFor(); + holder = holders.get(nullClass); + if (holder!=null) holder.collectInterested(rslt, transition, ordinal); + return rslt; + } + + /** + * Visit each <code>TransitionWatcher</code> and check to see if + * it has expired, removing it if it has. + */ + void reap() { + final long now = System.currentTimeMillis(); + Iterator<WatchersForTemplateClass> watchers = holders.values().iterator(); + while (watchers.hasNext()){ + watchers.next().reap(now); + } + } + + /** + * Return the <code>OutriggerServerImpl</code> this + * <code>TransitionWatchers</code> object is part of. + * @return The <code>OutriggerServerImpl</code> this + * <code>TransitionWatchers</code> is part of. + */ + OutriggerServerImpl getServer() { + return server; + } + +}
