I am working on upgrading to TDB 0.9.2

An issue, (which may or may not be new) is that unlike GraphMem TDB will throw a concurrent modification from a write that is concurrent with a read. (The behavior I am expecting is for reads only to throw concurrent modifications).

I am currently modifying DatasetControlMRSW (my current version attached), using a copy of the source. I am wondering if there is, or could be, some extension point where I can add a factory for DatasetControl.

(Note the code to show extended stack traces showing both stack traces where there are conflicting reads and writes).

The actual policy I am using is:
(Single Writer or Multiple Readers) and Multiple Optimistic Readers - that recover from concurrent modifications.

Jeremy
/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.hp.hpl.jena.tdb.sys;

import static java.lang.String.format ;

import java.util.ConcurrentModificationException ;
import java.util.Iterator ;
import java.util.Map;
import java.util.NoSuchElementException ;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong ;

import org.openjena.atlas.iterator.Iter ;
import org.openjena.atlas.lib.Closeable ;

/** A policy that checks, but does not enforce, single writer or multiple 
writer locking policy */ 
public class DatasetControlMRSW implements DatasetControl
{
        private final boolean concurrencyChecking = true ;
        /**
         * Set this field to true to give longer stack-traces when
         * the policy is violated, which gives details of the two conflicting
         * operations. This is extremely slow since it creates an exception
         * on every operation just in case it conflicts with a later operation.
         */
    public final static boolean DEBUG_VERY_SLOW_MRSW = true;
    private RuntimeException writeOperation;
        private final Map<Long,RuntimeException> readOperations = new 
ConcurrentHashMap<Long,RuntimeException>();
        private final AtomicLong epoch = new AtomicLong(5) ;                // 
Update counters, used to check iterators. No need to start at 0.
    private final AtomicLong readCounter = new AtomicLong(0) ;
    private final AtomicLong writeCounter = new AtomicLong(0) ;
    private CountDownLatch correctionOfConcurrentModication = new 
CountDownLatch(0);
    
    public DatasetControlMRSW()
    { }

    @Override
    synchronized public void startRead()
    {
        try {
                        correctionOfConcurrentModication.await();
                } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                } finally {
            readCounter.getAndIncrement() ;
                }
        checkConcurrency(true) ;
    }

    @Override
    synchronized public void finishRead()
    {
        long R = readCounter.decrementAndGet() ;
        if (correctionOfConcurrentModication.getCount()>0) {
                correctionOfConcurrentModication.countDown();
                policyError(true,"A write occurred during this read.");
        }
        if (DEBUG_VERY_SLOW_MRSW) {
                RuntimeException rte = 
readOperations.remove(Thread.currentThread().getId());
                if (rte != null && rte.getCause()!=null) {
                        
readOperations.put(Thread.currentThread().getId(),(RuntimeException)rte.getCause());
                }
        }
    }

    @Override
    synchronized public void startUpdate()
    {
        epoch.getAndIncrement() ;
        writeCounter.getAndIncrement() ;
        checkConcurrency(false) ;
    }

    @Override
    public void finishUpdate()
    {
        writeCounter.decrementAndGet() ;
        
    }

    private void checkConcurrency(boolean read)
    {
        long R, W ;
        synchronized (this)
        {
                R = readCounter.get() ;
                W = writeCounter.get() ;

                try {
                        if ( R > 0 && W > 0 )
                                policyError(read, R, W) ;
                        if ( W > 1 )
                                policyError(read, R, W) ;
                }
                finally {
                        if (DEBUG_VERY_SLOW_MRSW) {
                                if (read) {
                                        final RuntimeException rte = new 
RuntimeException("A TDB read operation");
                                        RuntimeException old = 
readOperations.put(Thread.currentThread().getId(), rte);
                                        if (old!=null) {
                                                rte.initCause(old);
                                        }
                                } else {
                                        writeOperation = new 
RuntimeException("A TDB write operation");
                                }
                        }
                }
        }

    }
    
    @Override
    public <T> Iterator<T> iteratorControl(Iterator<T> iter) { return new 
IteratorCheckNotConcurrent<T>(iter, epoch) ; }
    
    private class IteratorCheckNotConcurrent<T> implements Iterator<T>, 
Closeable
    {
        private Iterator<T> iter ;
        private AtomicLong eCount ;
        private boolean finished = false ;
        private long startEpoch ; 

        IteratorCheckNotConcurrent(Iterator<T> iter, AtomicLong eCount )
        {
            // Assumes correct locking to set up, i.e. eCount not changing 
(writer on separate thread).
            this.iter = iter ;
            this.eCount = eCount ;
            this.startEpoch = eCount.get();
        }

        private void checkCourrentModification()
        {
            if ( finished )
                return ;
            
            long now = eCount.get() ;
            if ( now != startEpoch )
            {
                policyError(true, format("Iterator: started at %d, now %d", 
startEpoch, now)) ;

            }
        }
        
        @Override
        public boolean hasNext()
        {
            checkCourrentModification() ;
            boolean b = iter.hasNext() ;
            if ( ! b )
                close() ;
            return b ;
        }

        @Override
        public T next()
        {
            checkCourrentModification() ;
            try { 
                return iter.next();
            } catch (NoSuchElementException ex) { close() ; throw ex ; }
        }

        @Override
        public void remove()
        {
            checkCourrentModification() ;
            iter.remove() ;
        }

        @Override
        public void close()
        {
            finished = true ;
            Iter.close(iter) ;
        }
    }

    
    private void policyError(boolean read, long R, long W)
    {
        if (!read && R>0 && W==1) {
                correctionOfConcurrentModication = new CountDownLatch((int)R);
        } else {
           policyError(read || W>1, format("Reader = %d, Writer = %d", R, W)) ;
        }
    }
    
    private void policyError(boolean read, String message)
    {
        final RuntimeException ex = new 
ConcurrentModificationException(message);
        if (DEBUG_VERY_SLOW_MRSW) {
                if (read) {
                        if ( writeOperation != null ) {
                                ex.initCause(writeOperation);
                        } 
                } else {
                        Iterator<RuntimeException> it = 
readOperations.values().iterator();
                        if (it.hasNext()) {
                                ex.initCause(it.next());
                        } else if ( writeCounter.get() > 1 ) {
                                ex.initCause(writeOperation);
                        }
                }
        }
                throw ex ;
    }
    

}

Reply via email to