GEODE-544: Removes soplog code and tests The "soplog" code was a partial implementation of a concurrent LSM tree stored on local disk. This is not currently used anywhere so is being cleaned up. The interfaces used by the HDFS feature have not been deleted.
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/f95eb683 Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/f95eb683 Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/f95eb683 Branch: refs/heads/feature/GEODE-544 Commit: f95eb6832bc834c4b157553bc43fb1c30631cc4c Parents: e1eb74e Author: Anthony Baker <[email protected]> Authored: Thu Nov 12 14:39:48 2015 -0800 Committer: Anthony Baker <[email protected]> Committed: Thu Nov 12 14:39:48 2015 -0800 ---------------------------------------------------------------------- .../persistence/soplog/AbstractCompactor.java | 533 ------------- .../soplog/AbstractKeyValueIterator.java | 76 -- .../soplog/AbstractSortedReader.java | 135 ---- .../soplog/ArraySerializedComparator.java | 144 ---- .../cache/persistence/soplog/Compactor.java | 174 ----- .../soplog/CompositeSerializedComparator.java | 57 -- .../soplog/IndexSerializedComparator.java | 127 --- .../cache/persistence/soplog/LevelTracker.java | 120 --- .../soplog/LexicographicalComparator.java | 460 ----------- .../cache/persistence/soplog/NonCompactor.java | 110 --- .../soplog/ReversingSerializedComparator.java | 67 -- .../persistence/soplog/SizeTieredCompactor.java | 198 ----- .../cache/persistence/soplog/SoplogToken.java | 116 --- .../cache/persistence/soplog/SortedBuffer.java | 367 --------- .../cache/persistence/soplog/SortedOplog.java | 158 ---- .../persistence/soplog/SortedOplogFactory.java | 278 ------- .../persistence/soplog/SortedOplogSet.java | 118 --- .../persistence/soplog/SortedOplogSetImpl.java | 780 ------------------- .../soplog/hfile/BlockCacheHolder.java | 39 - .../soplog/hfile/HFileSortedOplog.java | 694 ----------------- .../soplog/hfile/HFileSortedOplogFactory.java | 80 -- .../soplog/nofile/NoFileSortedOplog.java | 244 ------ .../soplog/nofile/NoFileSortedOplogFactory.java | 41 - .../cache/persistence/soplog/AppendLog.java | 65 -- .../ArraySerializedComparatorJUnitTest.java | 95 --- .../CompactionSortedOplogSetTestCase.java | 134 ---- .../persistence/soplog/CompactionTestCase.java | 206 ----- .../persistence/soplog/ComparisonTestCase.java | 77 -- .../soplog/IndexComparatorJUnitTest.java | 79 -- .../LexicographicalComparatorJUnitTest.java | 204 ----- .../soplog/RecoverableSortedOplogSet.java | 221 ------ .../soplog/SizeTieredCompactorJUnitTest.java | 110 --- .../SizeTieredSortedOplogSetJUnitTest.java | 43 - .../soplog/SortedBufferJUnitTest.java | 39 - .../soplog/SortedOplogSetJUnitTest.java | 273 ------- .../soplog/SortedReaderTestCase.java | 295 ------- .../nofile/NoFileSortedOplogJUnitTest.java | 48 -- 37 files changed, 7005 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f95eb683/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/AbstractCompactor.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/AbstractCompactor.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/AbstractCompactor.java deleted file mode 100644 index 0b62313..0000000 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/AbstractCompactor.java +++ /dev/null @@ -1,533 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.internal.cache.persistence.soplog; - -import java.io.File; -import java.io.IOException; -import java.io.InterruptedIOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collection; -import java.util.EnumMap; -import java.util.Iterator; -import java.util.List; -import java.util.Queue; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -import org.apache.logging.log4j.Logger; - -import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplog.SortedOplogReader; -import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplog.SortedOplogWriter; -import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplogSetImpl.MergedIterator; -import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedReader.Metadata; -import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedReader.SerializedComparator; -import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedReader.SortedIterator; -import com.gemstone.gemfire.internal.logging.LogService; -import com.gemstone.gemfire.internal.util.AbortableTaskService; -import com.gemstone.gemfire.internal.util.AbortableTaskService.AbortableTask; - -public abstract class AbstractCompactor<T extends Comparable<T>> implements Compactor { - protected static final Logger logger = LogService.getLogger(); - - /** the soplog factory */ - protected final SortedOplogFactory factory; - - /** the fileset */ - protected final Fileset<T> fileset; - - /** the soplog tracker */ - protected final CompactionTracker<T> tracker; - - /** thread for background compaction */ - protected final AbortableTaskService compactor; - - /** inactive files waiting to be deleted */ - private final Queue<TrackedReference<SortedOplogReader>> inactive; - - /** the soplogs */ - protected final List<Level> levels; - - /** provides consistent view of all levels */ - private final ReadWriteLock levelLock; - - /** test flag to abort compaction */ - volatile boolean testAbortDuringCompaction; - - /** test flag to delay compaction */ - volatile CountDownLatch testDelayDuringCompaction; - - protected final String logPrefix; - - public AbstractCompactor(SortedOplogFactory factory, - Fileset<T> fileset, CompactionTracker<T> tracker, - Executor exec) { - assert factory != null; - assert fileset != null; - assert tracker != null; - assert exec != null; - - this.factory = factory; - this.fileset = fileset; - this.tracker = tracker; - - compactor = new AbortableTaskService(exec); - inactive = new ConcurrentLinkedQueue<TrackedReference<SortedOplogReader>>(); - - levelLock = new ReentrantReadWriteLock(); - levels = new ArrayList<Level>(); - - this.logPrefix = "<" + factory.getConfiguration().getName() + "> "; - } - - @Override - public final void add(SortedOplog soplog) throws IOException { - levels.get(0).add(soplog); - } - - @Override - public final boolean compact() throws IOException { - final CountDownLatch done = new CountDownLatch(1); - final AtomicReference<Object> result = new AtomicReference<Object>(null); - - compact(true, new CompactionHandler() { - @Override - public void complete(boolean compacted) { - result.set(compacted); - done.countDown(); - } - - @Override - public void failed(Throwable ex) { - result.set(ex); - done.countDown(); - } - }); - - try { - done.await(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new InterruptedIOException(); - } - - Object val = result.get(); - if (val instanceof Throwable) { - throw new IOException((Throwable) val); - } - - assert val != null; - return (Boolean) val; - } - - @Override - public final void compact(final boolean force, final CompactionHandler ch) { - // TODO implement force=true, results in a single soplog - AbortableTask task = new AbortableTask() { - @Override - public void runOrAbort(AtomicBoolean aborted) { - final boolean isDebugEnabled = logger.isDebugEnabled(); - if (isDebugEnabled) { - logger.debug("{}Beginning compaction", AbstractCompactor.this.logPrefix); - } - - // TODO could do this in one go instead of level-by-level - try { - boolean compacted = false; - for (Level level : levels) { - if (aborted.get()) { - if (isDebugEnabled) { - logger.debug("{}Aborting compaction", AbstractCompactor.this.logPrefix); - } - break; - } - - checkTestDelay(); - if (force || level.needsCompaction()) { - if (isDebugEnabled) { - logger.debug("{}Compacting level {}", AbstractCompactor.this.logPrefix, level); - } - - long start = factory.getConfiguration().getStatistics().getMinorCompaction().begin(); - try { - compacted |= level.compact(aborted); - factory.getConfiguration().getStatistics().getMinorCompaction().end(start); - - } catch (IOException e) { - factory.getConfiguration().getStatistics().getMinorCompaction().error(start); - } - } - } - - cleanupInactive(); - if (ch != null) { - if (isDebugEnabled) { - logger.debug("{}Completed compaction", AbstractCompactor.this.logPrefix); - } - ch.complete(compacted); - } - } catch (Exception e) { - if (isDebugEnabled) { - logger.debug("{}Encountered an error during compaction", AbstractCompactor.this.logPrefix, e); - } - if (ch != null) { - ch.failed(e); - } - } - } - - @Override - public void abortBeforeRun() { - if (ch != null) { - ch.complete(false); - } - } - }; - compactor.execute(task); - } - - @Override - public final CompactionTracker<?> getTracker() { - return tracker; - } - - @Override - public final Fileset<?> getFileset() { - return fileset; - } - - @Override - public final Collection<TrackedReference<SortedOplogReader>> getActiveReaders( - byte[] start, byte[] end) { - - // need to coordinate with clear() so we can get a consistent snapshot - // across levels - levelLock.readLock().lock(); - try { - // TODO this seems very garbage-y - List<TrackedReference<SortedOplogReader>> soplogs = new ArrayList<TrackedReference<SortedOplogReader>>(); - for (Level level : levels) { - soplogs.addAll(level.getSnapshot(start, end)); - } - return soplogs; - } finally { - levelLock.readLock().unlock(); - } - } - - @Override - public final void clear() throws IOException { - if (logger.isDebugEnabled()) { - logger.debug("{}Clearing compactor", this.logPrefix); - } - - compactor.abortAll(); - releaseTestDelay(); - compactor.waitForCompletion(); - - levelLock.writeLock().lock(); - try { - for (Level l : levels) { - l.clear(); - } - } finally { - levelLock.writeLock().unlock(); - } - - cleanupInactive(); - } - - @Override - public final void close() throws IOException { - if (logger.isDebugEnabled()) { - logger.debug("{}Closing compactor", this.logPrefix); - } - - compactor.abortAll(); - releaseTestDelay(); - compactor.waitForCompletion(); - - levelLock.writeLock().lock(); - try { - for (Level l : levels) { - l.close(); - } - } finally { - levelLock.writeLock().unlock(); - } - - TrackedReference<SortedOplogReader> tr; - while ((tr = inactive.poll()) != null) { - deleteInactive(tr); - } - inactive.clear(); - } - - /** - * Creates a new soplog by merging the supplied soplog readers. - * - * @param readers the readers to merge - * @param collect true if deleted entries should be removed - * @return the merged soplog - * - * @throws IOException error during merge operation - */ - protected SortedOplog merge( - Collection<TrackedReference<SortedOplogReader>> readers, - boolean collect, - AtomicBoolean aborted) throws IOException { - - SerializedComparator sc = null; - List<SortedIterator<ByteBuffer>> iters = new ArrayList<SortedIterator<ByteBuffer>>(); - for (TrackedReference<SortedOplogReader> tr : readers) { - iters.add(tr.get().scan()); - sc = tr.get().getComparator(); - } - - SortedIterator<ByteBuffer> scan = new MergedIterator(sc, readers, iters); - try { - if (!scan.hasNext()) { - checkAbort(aborted); - if (logger.isDebugEnabled()) { - logger.debug("{}No entries left after compaction with readers {} ", this.logPrefix, readers); - } - return null; - } - - File f = fileset.getNextFilename(); - if (logger.isDebugEnabled()) { - logger.debug("{}Compacting soplogs {} into {}", this.logPrefix, readers, f); - } - - if (testAbortDuringCompaction) { - aborted.set(true); - } - - SortedOplog soplog = factory.createSortedOplog(f); - SortedOplogWriter wtr = soplog.createWriter(); - try { - while (scan.hasNext()) { - checkAbort(aborted); - scan.next(); - if (!(collect && isDeleted(scan.value()))) { - wtr.append(scan.key(), scan.value()); - } - } - - EnumMap<Metadata, byte[]> metadata = mergeMetadata(readers); - wtr.close(metadata); - return soplog; - - } catch (IOException e) { - wtr.closeAndDelete(); - throw e; - } - } finally { - scan.close(); - } - } - - protected EnumMap<Metadata, byte[]> mergeMetadata( - Collection<TrackedReference<SortedOplogReader>> readers) - throws IOException { - // merge the metadata into the compacted file - EnumMap<Metadata, byte[]> metadata = new EnumMap<Metadata, byte[]>(Metadata.class); - for (Metadata meta : Metadata.values()) { - byte[] val = null; - for (TrackedReference<SortedOplogReader> tr : readers) { - byte[] tmp = tr.get().getMetadata(meta); - if (val == null) { - val = tmp; - - } else if (tmp != null) { - val = factory.getConfiguration().getMetadataCompactor(meta).compact(val, tmp); - } - } - if (val != null) { - metadata.put(meta, val); - } - } - return metadata; - } - - protected void releaseTestDelay() { - if (testDelayDuringCompaction != null) { - if (logger.isDebugEnabled()) { - logger.debug("{}Releasing testDelayDuringCompaction", this.logPrefix); - } - testDelayDuringCompaction.countDown(); - } - } - - protected void checkTestDelay() { - if (testDelayDuringCompaction != null) { - try { - if (logger.isDebugEnabled()) { - logger.debug("{}Waiting for testDelayDuringCompaction", this.logPrefix); - } - testDelayDuringCompaction.await(); - - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - } - - - /** - * Returns the number of inactive readers. - * @return the inactive readers - */ - protected int countInactiveReaders() { - return inactive.size(); - } - - /** - * Returns the requested level for testing purposes. - * @param level the level ordinal - * @return the level - */ - protected Level getLevel(int level) { - return levels.get(level); - } - - protected void cleanupInactive() throws IOException { - for (Iterator<TrackedReference<SortedOplogReader>> iter = inactive.iterator(); iter.hasNext(); ) { - TrackedReference<SortedOplogReader> tr = iter.next(); - if (!tr.inUse() && inactive.remove(tr)) { - deleteInactive(tr); - } - } - } - - protected void markAsInactive(Iterable<TrackedReference<SortedOplogReader>> snapshot, T attach) throws IOException { - final boolean isDebugEnabled = logger.isDebugEnabled(); - for (Iterator<TrackedReference<SortedOplogReader>> iter = snapshot.iterator(); iter.hasNext(); ) { - TrackedReference<SortedOplogReader> tr = iter.next(); - if (isDebugEnabled) { - logger.debug("{}Marking {} as inactive", this.logPrefix, tr); - } - - inactive.add(tr); - tracker.fileRemoved(tr.get().getFile(), attach); - - factory.getConfiguration().getStatistics().incActiveFiles(-1); - factory.getConfiguration().getStatistics().incInactiveFiles(1); - } - } - - private boolean isDeleted(ByteBuffer value) { - //first byte determines the value type - byte valType = value.get(value.position()); - return SoplogToken.isTombstone(valType) || SoplogToken.isRemovedPhase2(valType); - } - - private void checkAbort(AtomicBoolean aborted) - throws InterruptedIOException { - if (aborted.get()) { - throw new InterruptedIOException(); - } - } - - private void deleteInactive(TrackedReference<SortedOplogReader> tr) - throws IOException { - tr.get().close(); - if (tr.get().getFile().delete()) { - if (logger.isDebugEnabled()) { - logger.debug("{}Deleted inactive soplog {}", this.logPrefix, tr.get().getFile()); - } - - tracker.fileDeleted(tr.get().getFile()); - factory.getConfiguration().getStatistics().incInactiveFiles(-1); - } - } - - /** - * Organizes a set of soplogs for a given level. - */ - protected static abstract class Level { - /** the level ordinal position */ - protected final int level; - - public Level(int level) { - this.level = level; - } - - @Override - public String toString() { - return String.valueOf(level); - } - - /** - * Returns true if the level needs compaction. - * @return true if compaction is needed - */ - protected abstract boolean needsCompaction(); - - /** - * Obtains the current set of active soplogs for this level. - * @return the soplog snapshot - */ - protected List<TrackedReference<SortedOplogReader>> getSnapshot() { - return getSnapshot(null, null); - } - - /** - * Obtains the current set of active soplogs for this level, optionally - * bounded by the start and end keys. - * - * @param start the start key - * @param end the end key - * @return the soplog snapshot - */ - protected abstract List<TrackedReference<SortedOplogReader>> getSnapshot(byte[] start, byte[] end); - - /** - * Clears the soplogs that match the metadata filter. - * @throws IOException error during close - */ - protected abstract void clear() throws IOException; - - /** - * Closes the soplogs managed by this level. - * @throws IOException error closing soplogs - */ - protected abstract void close() throws IOException; - - /** - * Adds a new soplog to this level. - * - * @param soplog the soplog - * @throws IOException error creating reader - */ - protected abstract void add(SortedOplog soplog) throws IOException; - - /** - * Merges the current soplogs into a new soplog and promotes it to the next - * level. The previous soplogs are marked for deletion. - * - * @param aborted true if the compaction should be aborted - * @throws IOException error unable to perform compaction - */ - protected abstract boolean compact(AtomicBoolean aborted) throws IOException; - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f95eb683/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/AbstractKeyValueIterator.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/AbstractKeyValueIterator.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/AbstractKeyValueIterator.java deleted file mode 100644 index 1326d5c..0000000 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/AbstractKeyValueIterator.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.internal.cache.persistence.soplog; - -import java.util.Iterator; -import java.util.NoSuchElementException; - -/** - * Provides an {@link Iterator} view over a collection of keys and values. The - * implementor must provide access to the current key/value as well as a means - * to move to the next pair. - * - * @author bakera - * - * @param <K> the key type - * @param <V> the value type - */ -public abstract class AbstractKeyValueIterator<K, V> implements KeyValueIterator<K, V> { - /** true if the iterator has been advanced to the next element */ - private boolean foundNext = false; - - @Override - public boolean hasNext() { - if (!foundNext) { - foundNext = step(); - } - return foundNext; - } - - @Override - public K next() { - if (!foundNext && !step()) { - throw new NoSuchElementException(); - } - - foundNext = false; - return key(); - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - - /** - * Returns the key at the current position. - * @return the key - */ - public abstract K key(); - - /** - * Returns the value at the current position. - * @return the value - */ - public abstract V value(); - - /** - * Steps the iteration to the next position. - * @return true if the step succeeded - */ - protected abstract boolean step(); -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f95eb683/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/AbstractSortedReader.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/AbstractSortedReader.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/AbstractSortedReader.java deleted file mode 100644 index c11e1e0..0000000 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/AbstractSortedReader.java +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.internal.cache.persistence.soplog; - -import java.io.IOException; -import java.nio.ByteBuffer; - - -/** - * Provides default behavior for range scans. - * - * @author bakera - */ -public abstract class AbstractSortedReader implements SortedReader<ByteBuffer> { - @Override - public final SortedIterator<ByteBuffer> scan() throws IOException { - return scan(null, true, null, true); - } - - @Override - public final SortedIterator<ByteBuffer> head(byte[] to, boolean inclusive) throws IOException{ - return scan(null, true, to, inclusive); - } - - @Override - public final SortedIterator<ByteBuffer> tail(byte[] from, boolean inclusive) throws IOException{ - return scan(from, inclusive, null, true); - } - - @Override - public final SortedIterator<ByteBuffer> scan(byte[] from, byte[] to) throws IOException{ - return scan(from, true, to, false); - } - - @Override - public final SortedIterator<ByteBuffer> scan(byte[] equalTo) throws IOException{ - return scan(equalTo, true, equalTo, true); - } - - @Override - public SortedIterator<ByteBuffer> scan(byte[] from, boolean fromInclusive, byte[] to, - boolean toInclusive) throws IOException{ - return scan(from, fromInclusive, to, toInclusive, true, null); - } - - @Override - public final SortedReader<ByteBuffer> withAscending(boolean ascending) { - if (this instanceof DelegateSortedReader) { - DelegateSortedReader tmp = (DelegateSortedReader) this; - return new DelegateSortedReader(tmp.delegate, ascending, tmp.filter); - } - return new DelegateSortedReader(this, ascending, null); - } - - @Override - public final SortedReader<ByteBuffer> withFilter(MetadataFilter filter) { - if (this instanceof DelegateSortedReader) { - DelegateSortedReader tmp = (DelegateSortedReader) this; - return new DelegateSortedReader(tmp.delegate, tmp.ascending, filter); - } - return new DelegateSortedReader(this, true, filter); - } - - protected class DelegateSortedReader extends AbstractSortedReader { - /** the embedded reader */ - private final AbstractSortedReader delegate; - - /** true if ascending */ - private final boolean ascending; - - /** the filter */ - private final MetadataFilter filter; - - public DelegateSortedReader(AbstractSortedReader reader, boolean ascending, MetadataFilter filter) { - this.delegate = reader; - this.ascending = ascending; - this.filter = filter; - } - - @Override - public boolean mightContain(byte[] key) throws IOException { - return delegate.mightContain(key); - } - - @Override - public ByteBuffer read(byte[] key) throws IOException { - return delegate.read(key); - } - - @Override - public SerializedComparator getComparator() { - return delegate.getComparator(); - } - - @Override - public SortedStatistics getStatistics() throws IOException { - return delegate.getStatistics(); - } - - @Override - public void close() throws IOException { - delegate.close(); - } - - @Override - public SortedIterator<ByteBuffer> scan( - byte[] from, boolean fromInclusive, - byte[] to, boolean toInclusive) throws IOException { - return scan(from, fromInclusive, to, toInclusive, ascending, filter); - } - - @Override - public SortedIterator<ByteBuffer> scan( - byte[] from, boolean fromInclusive, - byte[] to, boolean toInclusive, - boolean ascending, - MetadataFilter filter) throws IOException { - return delegate.scan(from, fromInclusive, to, toInclusive, ascending, filter); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f95eb683/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/ArraySerializedComparator.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/ArraySerializedComparator.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/ArraySerializedComparator.java deleted file mode 100644 index 139b3cb..0000000 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/ArraySerializedComparator.java +++ /dev/null @@ -1,144 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.internal.cache.persistence.soplog; - -import java.nio.ByteBuffer; - -import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedReader.SerializedComparator; -import com.gemstone.gemfire.internal.util.Bytes; - -/** - * Provides comparisons of composite keys by comparing each of the constituent - * parts of the key in order. A subkey will only be evaluated if the preceeding - * keys have compared as equal. - * <p> - * Prior to use, an instance must be configured with the ordered list of - * comparators to apply. - * <p> - * The keys for an N-composite are stored as follows: - * <pre> - * | len[0] | key[0] | len[1] | key[1] | ... | len[N-2] | key[N-2] | key[N-1] | - * </pre> - * where the key length is stored as a protobuf varint. - * - * @author bakera - */ -public class ArraySerializedComparator implements CompositeSerializedComparator, -DelegatingSerializedComparator { - - /** the comparators */ - private volatile SerializedComparator[] comparators; - - /** - * Injects the comparators to be used on composite keys. The number and order - * must match the key. - * - * @param comparators the comparators - */ - public void setComparators(SerializedComparator[] comparators) { - this.comparators = comparators; - } - - @Override - public int compare(byte[] o1, byte[] o2) { - return compare(o1, 0, o1.length, o2, 0, o2.length); - } - - @Override - public int compare(byte[] b1, int o1, int l1, byte[] b2, int o2, int l2) { - SerializedComparator[] sc = comparators; - - int off1 = o1; - int off2 = o2; - for (int i = 0; i < sc.length - 1; i++) { - int klen1 = Bytes.getVarInt(b1, off1); - int klen2 = Bytes.getVarInt(b2, off2); - - off1 += Bytes.sizeofVarInt(klen1); - off2 += Bytes.sizeofVarInt(klen2); - - if (!SoplogToken.isWildcard(b1, off1, b2, off2)) { - int diff = sc[i].compare(b1, off1, klen1, b2, off2, klen2); - if (diff != 0) { - return diff; - } - } - off1 += klen1; - off2 += klen2; - } - - if (!SoplogToken.isWildcard(b1, off1, b2, off2)) { - l1 -= (off1 - o1); - l2 -= (off2 - o2); - return sc[sc.length - 1].compare(b1, off1, l1, b2, off2, l2); - } - return 0; - } - - @Override - public SerializedComparator[] getComparators() { - return comparators; - } - - @Override - public byte[] createCompositeKey(byte[] key1, byte[] key2) { - return createCompositeKey(new byte[][] { key1, key2 }); - } - - @Override - public byte[] createCompositeKey(byte[]... keys) { - assert comparators.length == keys.length; - - int size = 0; - for (int i = 0; i < keys.length - 1; i++) { - size += keys[i].length + Bytes.sizeofVarInt(keys[i].length); - } - size += keys[keys.length - 1].length; - - // TODO do we have to do a copy here or can we delay until the disk write? - int off = 0; - byte[] buf = new byte[size]; - for (int i = 0; i < keys.length - 1; i++) { - off = Bytes.putVarInt(keys[i].length, buf, off); - System.arraycopy(keys[i], 0, buf, off, keys[i].length); - off += keys[i].length; - } - System.arraycopy(keys[keys.length - 1], 0, buf, off, keys[keys.length - 1].length); - return buf; - } - - @Override - public ByteBuffer getKey(ByteBuffer key, int ordinal) { - assert ordinal < comparators.length; - - for (int i = 0; i < comparators.length - 1; i++) { - int klen = Bytes.getVarInt(key); - if (i == ordinal) { - ByteBuffer subkey = (ByteBuffer) key.slice().limit(klen); - key.rewind(); - - return subkey; - } - key.position(key.position() + klen); - } - - ByteBuffer subkey = key.slice(); - key.rewind(); - - return subkey; - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f95eb683/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/Compactor.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/Compactor.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/Compactor.java deleted file mode 100644 index c80f118..0000000 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/Compactor.java +++ /dev/null @@ -1,174 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.internal.cache.persistence.soplog; - -import java.io.File; -import java.io.IOException; -import java.util.Collection; -import java.util.SortedMap; - -import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplog.SortedOplogReader; - -/** - * Defines a mechanism to track and compact soplogs. - * - * @author bakera - */ -public interface Compactor { - /** - * Compares metadata values. - */ - public interface MetadataCompactor { - /** - * Combines two metadata values into a single value. Used during compaction - * to merge metadata between soplog files. - * - * @param metadata1 the first value - * @param metadata2 the second value - * @return the combined metadata - */ - byte[] compact(byte[] metadata1, byte[] metadata2); - } - - /** - * Provides notification on the status of a compaction. - */ - public interface CompactionHandler { - /** - * Invoked when a compaction operation has completed successfully. - * @param compacted true if any files were compacted - */ - void complete(boolean compacted); - - /** - * Invoked when a compaction operation has failed. - * @param ex the failure - */ - void failed(Throwable ex); - } - - /** - * Provides external configuration of file operations for recovering and - * new file creation. - * - * @param <T> the compaction info - */ - public interface Fileset<T extends Comparable<T>> { - /** - * Returns the set of active soplogs. - * @return the active files - */ - SortedMap<T, ? extends Iterable<File>> recover(); - - /** - * Returns the pathname for the next soplog. - * @return the soplog filename - */ - File getNextFilename(); - } - - /** - * Provides a mechanism to coordinate file changes to the levels managed - * by the compactor. - * - * @param T the attachment type - */ - public interface CompactionTracker<T extends Comparable<T>> { - /** - * Invoked when a new file is added. - * @param f the file - * @param attach the attachment - */ - void fileAdded(File f, T attach); - - /** - * Invoked when a file is removed. - * @param f the file - * @param attach the attachment - */ - void fileRemoved(File f, T attach); - - /** - * Invoked when a file is deleted. - * @param f the attachment - */ - void fileDeleted(File f); - } - - /** - * Synchronously invokes the force compaction operation and waits for completion. - * - * @return true if any files were compacted - * @throws IOException error during compaction - */ - boolean compact() throws IOException; - - /** - * Requests a compaction operation be performed on the soplogs. This invocation - * may block if there are too many outstanding write requests. - * - * @param force if false, compaction will only be performed if necessary - * @param ch invoked when the compaction is complete, optionally null - * @throws IOException error during compaction - */ - void compact(boolean force, CompactionHandler ch); - - /** - * Returns the active readers for the given key range. The caller is responsible - * for decrementing the use count of each reader when finished. - * - * @param start the start key inclusive, or null for beginning - * @param end the end key inclusive, or null for last - * @return the readers - * - * @see TrackedReference - */ - Collection<TrackedReference<SortedOplogReader>> getActiveReaders( - byte[] start, byte[] end); - - /** - * Adds a new soplog to the active set. - * @param soplog the soplog - * @throws IOException unable to add soplog - */ - void add(SortedOplog soplog) throws IOException; - - /** - * Returns the compaction tracker for coordinating changes to the file set. - * @return the tracker - */ - CompactionTracker<?> getTracker(); - - /** - * Returns the file manager for managing the soplog files. - * @return the fileset - */ - Fileset<?> getFileset(); - - /** - * Clears the active files managed by the compactor. Files will be marked as - * inactive and eventually deleted. - * - * @throws IOException unable to clear - */ - void clear() throws IOException; - /** - * Closes the compactor. - * @throws IOException unable to close - */ - void close() throws IOException; -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f95eb683/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/CompositeSerializedComparator.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/CompositeSerializedComparator.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/CompositeSerializedComparator.java deleted file mode 100644 index 8d9aae5..0000000 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/CompositeSerializedComparator.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.internal.cache.persistence.soplog; - -import java.nio.ByteBuffer; - -import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedReader.SerializedComparator; - -/** - * Creates and compares composite keys. - * - * @author bakera - */ -public interface CompositeSerializedComparator extends SerializedComparator { - /** - * Constructs a composite key consisting of a primary key and a secondary key. - * - * @param key1 the primary key - * @param key2 the secondary key - * @return the composite key - */ - public byte[] createCompositeKey(byte[] key1, byte[] key2); - - /** - * Constructs a composite key by combining the supplied keys. The number of - * keys and their order must match the comparator set. - * <p> - * The <code>WILDCARD_KEY</code> token may be used to match all subkeys in the - * given ordinal position. This is useful when constructing a search key to - * retrieve all keys for a given primary key, ignoring the remaining subkeys. - * - * @param keys the keys, ordered by sort priority - * @return the composite key - */ - public byte[] createCompositeKey(byte[]... keys); - - /** - * Returns subkey for the given ordinal position. - * @param key the composite key - * @return the subkey - */ - public ByteBuffer getKey(ByteBuffer key, int ordinal); -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f95eb683/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/IndexSerializedComparator.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/IndexSerializedComparator.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/IndexSerializedComparator.java deleted file mode 100644 index 816eea0..0000000 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/IndexSerializedComparator.java +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.internal.cache.persistence.soplog; - -import java.nio.ByteBuffer; - -import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedReader.SerializedComparator; -import com.gemstone.gemfire.internal.util.Bytes; - -/** - * Provides a comparator for composite keys of the form (k0, k1). The primary - * keys are compared lexicographically while the secondary keys are compared - * bitwise. The key format includes the primary key length to avoid deserialization - * the secondary key when reading: - * <pre> - * | varint | primary key | secondary key | - * </pre> - * The key length is encoded using a protobuf-style varint. - * <p> - * - * @author bakera - */ -public class IndexSerializedComparator implements CompositeSerializedComparator, -DelegatingSerializedComparator { - - private volatile SerializedComparator primary; - private volatile SerializedComparator secondary; - - public IndexSerializedComparator() { - primary = new LexicographicalComparator(); - secondary = new ByteComparator(); - } - - @Override - public void setComparators(SerializedComparator[] comparators) { - assert comparators.length == 2; - - primary = comparators[0]; - secondary = comparators[1]; - } - - @Override - public SerializedComparator[] getComparators() { - return new SerializedComparator[] { primary, secondary }; - } - - @Override - public int compare(byte[] o1, byte[] o2) { - return compare(o1, 0, o1.length, o2, 0, o2.length); - } - - @Override - public int compare(byte[] b1, int o1, int l1, byte[] b2, int o2, int l2) { - int klen1 = Bytes.getVarInt(b1, o1); - int klen2 = Bytes.getVarInt(b2, o2); - - int off1 = o1 + Bytes.sizeofVarInt(klen1); - int off2 = o2 + Bytes.sizeofVarInt(klen2); - - // skip the comparison operation if there is a SearchToken.WILDCARD - if (!SoplogToken.isWildcard(b1, off1, b2, off2)) { - int diff = primary.compare(b1, off1, klen1, b2, off2, klen2); - if (diff != 0) { - return diff; - } - } - off1 += klen1; - off2 += klen2; - - if (!SoplogToken.isWildcard(b1, off1, b2, off2)) { - l1 -= (off1 - o1); - l2 -= (off2 - o2); - return secondary.compare(b1, off1, l1, b2, off2, l2); - } - return 0; - } - - @Override - public ByteBuffer getKey(ByteBuffer key, int ordinal) { - assert ordinal < 2; - - ByteBuffer subkey; - int klen = Bytes.getVarInt(key); - if (ordinal == 0) { - subkey = (ByteBuffer) key.slice().limit(klen); - - } else { - subkey = ((ByteBuffer) key.position(key.position() + klen)).slice(); - } - - key.rewind(); - return subkey; - } - - @Override - public byte[] createCompositeKey(byte[] key1, byte[] key2) { - int vlen = Bytes.sizeofVarInt(key1.length); - byte[] buf = new byte[vlen + key1.length + key2.length]; - - Bytes.putVarInt(key1.length, buf, 0); - System.arraycopy(key1, 0, buf, vlen, key1.length); - System.arraycopy(key2, 0, buf, vlen + key1.length, key2.length); - - return buf; - } - - @Override - public byte[] createCompositeKey(byte[]... keys) { - assert keys.length == 2; - - return createCompositeKey(keys[0], keys[1]); - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f95eb683/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/LevelTracker.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/LevelTracker.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/LevelTracker.java deleted file mode 100644 index a590283..0000000 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/LevelTracker.java +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.internal.cache.persistence.soplog; - -import java.io.Closeable; -import java.io.File; -import java.io.FileReader; -import java.io.FileWriter; -import java.io.IOException; -import java.io.LineNumberReader; -import java.io.Writer; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import java.util.SortedMap; -import java.util.TreeMap; -import java.util.concurrent.atomic.AtomicLong; - -import com.gemstone.gemfire.internal.cache.persistence.soplog.Compactor.CompactionTracker; -import com.gemstone.gemfire.internal.cache.persistence.soplog.Compactor.Fileset; - -/** - * A simple, non-robust file tracker for tracking soplogs by level. - * - * @author bakera - */ -public class LevelTracker implements Fileset<Integer>, CompactionTracker<Integer>, Closeable { - private final String name; - private final File manifest; - - private final SortedMap<Integer, Set<File>> levels; - private final AtomicLong file; - - public LevelTracker(String name, File manifest) throws IOException { - this.name = name; - this.manifest = manifest; - file = new AtomicLong(0); - - levels = new TreeMap<Integer, Set<File>>(); - if (!manifest.exists()) { - return; - } - - LineNumberReader rdr = new LineNumberReader(new FileReader(manifest)); - try { - String line; - while ((line = rdr.readLine()) != null) { - String[] parts = line.split(","); - int level = Integer.parseInt(parts[0]); - File f = new File(parts[1]); - add(f, level); - } - } finally { - rdr.close(); - } - } - - @Override - public SortedMap<Integer, ? extends Iterable<File>> recover() { - return levels; - } - - @Override - public File getNextFilename() { - return new File(manifest.getParentFile(), name + "-" + System.currentTimeMillis() - + "-" + file.getAndIncrement() + ".soplog"); - } - - @Override - public void fileAdded(File f, Integer attach) { - add(f, attach); - } - - @Override - public void fileRemoved(File f, Integer attach) { - levels.get(attach).remove(f); - } - - @Override - public void fileDeleted(File f) { - } - - @Override - public void close() throws IOException { - Writer wtr = new FileWriter(manifest); - try { - for (Map.Entry<Integer, Set<File>> entry : levels.entrySet()) { - for (File f : entry.getValue()) { - wtr.write(entry.getKey() + "," + f + "\n"); - } - } - } finally { - wtr.flush(); - wtr.close(); - } - } - - private void add(File f, int level) { - Set<File> files = levels.get(level); - if (files == null) { - files = new HashSet<File>(); - levels.put(level, files); - } - files.add(f); - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f95eb683/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/LexicographicalComparator.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/LexicographicalComparator.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/LexicographicalComparator.java deleted file mode 100644 index 24fba50..0000000 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/LexicographicalComparator.java +++ /dev/null @@ -1,460 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.internal.cache.persistence.soplog; - -import java.io.ByteArrayInputStream; -import java.io.DataInput; -import java.io.DataInputStream; - -import com.gemstone.gemfire.DataSerializer; -import com.gemstone.gemfire.internal.DSCODE; -import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedReader.SerializedComparator; -import com.gemstone.gemfire.internal.util.Bytes; - -/** - * Provides type-optimized comparisons for serialized objects. All data is - * assumed to have been serialized via a call to - * {@link DataSerializer#writeObject(Object, java.io.DataOutput) }. The following - * data types have optimized comparisons: - * <ul> - * <li>boolean - * <li>byte - * <li>short - * <li>char - * <li>int - * <li>long - * <li>float - * <li>double - * <li>String (not {@link DSCODE#HUGE_STRING} or {@link DSCODE#HUGE_STRING_BYTES}) - * </ul> - * Types that are not listed above fallback to deserialization and comparison - * via the {@link Comparable} API. - * <p> - * Any numeric type may be compared against another numeric type (e.g. double - * to int). - * <p> - * <strong>Any changes to the serialized format may cause version incompatibilities. - * In addition, the comparison operations will need to be updated.</strong> - * <p> - * - * @author bakera - */ -public class LexicographicalComparator implements SerializedComparator { - - ////////////////////////////////////////////////////////////////////////////// - // - // constants for any-to-any numeric comparisons - // - ////////////////////////////////////////////////////////////////////////////// - - private static final int BYTE_TO_BYTE = DSCODE.BYTE << 8 | DSCODE.BYTE; - private static final int BYTE_TO_SHORT = DSCODE.BYTE << 8 | DSCODE.SHORT; - private static final int BYTE_TO_INT = DSCODE.BYTE << 8 | DSCODE.INTEGER; - private static final int BYTE_TO_LONG = DSCODE.BYTE << 8 | DSCODE.LONG; - private static final int BYTE_TO_FLOAT = DSCODE.BYTE << 8 | DSCODE.FLOAT; - private static final int BYTE_TO_DOUBLE = DSCODE.BYTE << 8 | DSCODE.DOUBLE; - - private static final int SHORT_TO_BYTE = DSCODE.SHORT << 8 | DSCODE.BYTE; - private static final int SHORT_TO_SHORT = DSCODE.SHORT << 8 | DSCODE.SHORT; - private static final int SHORT_TO_INT = DSCODE.SHORT << 8 | DSCODE.INTEGER; - private static final int SHORT_TO_LONG = DSCODE.SHORT << 8 | DSCODE.LONG; - private static final int SHORT_TO_FLOAT = DSCODE.SHORT << 8 | DSCODE.FLOAT; - private static final int SHORT_TO_DOUBLE = DSCODE.SHORT << 8 | DSCODE.DOUBLE; - - private static final int LONG_TO_BYTE = DSCODE.LONG << 8 | DSCODE.BYTE; - private static final int LONG_TO_SHORT = DSCODE.LONG << 8 | DSCODE.SHORT; - private static final int LONG_TO_INT = DSCODE.LONG << 8 | DSCODE.INTEGER; - private static final int LONG_TO_LONG = DSCODE.LONG << 8 | DSCODE.LONG; - private static final int LONG_TO_FLOAT = DSCODE.LONG << 8 | DSCODE.FLOAT; - private static final int LONG_TO_DOUBLE = DSCODE.LONG << 8 | DSCODE.DOUBLE; - - private static final int INT_TO_BYTE = DSCODE.INTEGER<< 8 | DSCODE.BYTE; - private static final int INT_TO_SHORT = DSCODE.INTEGER<< 8 | DSCODE.SHORT; - private static final int INT_TO_INT = DSCODE.INTEGER<< 8 | DSCODE.INTEGER; - private static final int INT_TO_LONG = DSCODE.INTEGER<< 8 | DSCODE.LONG; - private static final int INT_TO_FLOAT = DSCODE.INTEGER<< 8 | DSCODE.FLOAT; - private static final int INT_TO_DOUBLE = DSCODE.INTEGER<< 8 | DSCODE.DOUBLE; - - private static final int FLOAT_TO_BYTE = DSCODE.FLOAT << 8 | DSCODE.BYTE; - private static final int FLOAT_TO_SHORT = DSCODE.FLOAT << 8 | DSCODE.SHORT; - private static final int FLOAT_TO_INT = DSCODE.FLOAT << 8 | DSCODE.INTEGER; - private static final int FLOAT_TO_LONG = DSCODE.FLOAT << 8 | DSCODE.LONG; - private static final int FLOAT_TO_FLOAT = DSCODE.FLOAT << 8 | DSCODE.FLOAT; - private static final int FLOAT_TO_DOUBLE = DSCODE.FLOAT << 8 | DSCODE.DOUBLE; - - private static final int DOUBLE_TO_BYTE = DSCODE.DOUBLE << 8 | DSCODE.BYTE; - private static final int DOUBLE_TO_SHORT = DSCODE.DOUBLE << 8 | DSCODE.SHORT; - private static final int DOUBLE_TO_INT = DSCODE.DOUBLE << 8 | DSCODE.INTEGER; - private static final int DOUBLE_TO_LONG = DSCODE.DOUBLE << 8 | DSCODE.LONG; - private static final int DOUBLE_TO_FLOAT = DSCODE.DOUBLE << 8 | DSCODE.FLOAT; - private static final int DOUBLE_TO_DOUBLE = DSCODE.DOUBLE << 8 | DSCODE.DOUBLE; - - ////////////////////////////////////////////////////////////////////////////// - // - // constants for any-to-any string comparisons - // - ////////////////////////////////////////////////////////////////////////////// - - private static final int STRING_TO_STRING = DSCODE.STRING << 8 | DSCODE.STRING; - private static final int STRING_TO_STRING_BYTES = DSCODE.STRING << 8 | DSCODE.STRING_BYTES; - private static final int STRING_BYTES_TO_STRING = DSCODE.STRING_BYTES << 8 | DSCODE.STRING; - private static final int STRING_BYTES_TO_STRING_BYTES = DSCODE.STRING_BYTES << 8 | DSCODE.STRING_BYTES; - - @Override - public int compare(byte[] o1, byte[] o2) { - return compare(o1, 0, o1.length, o2, 0, o2.length); - } - - @Override - public int compare(byte[] b1, int o1, int l1, byte[] b2, int o2, int l2) { - byte type1 = b1[o1]; - byte type2 = b2[o2]; - - // optimized comparisons - if (isString(type1) && isString(type2)) { - return compareAsString(type1, b1, o1, type2, b2, o2); - - } else if (isNumeric(type1) && isNumeric(type2)) { - return compareAsNumeric(type1, b1, o1, type2, b2, o2); - - } else if (type1 == DSCODE.BOOLEAN && type2 == DSCODE.BOOLEAN) { - return compareAsBoolean(getBoolean(b1, o1), getBoolean(b2, o2)); - - } else if (type1 == DSCODE.CHARACTER && type2 == DSCODE.CHARACTER) { - return compareAsChar(getChar(b1, o1), getChar(b2, o2)); - - } else if (type1 == DSCODE.NULL || type2 == DSCODE.NULL) { - // null check, assumes NULLs sort last - return type1 == type2 ? 0 : type1 == DSCODE.NULL ? 1 : -1; - } - - // fallback, will deserialize to Comparable - return compareAsObject(b1, o1, l1, b2, o2, l2); - } - - private static boolean isNumeric(int type) { - return type == DSCODE.BYTE - || type == DSCODE.SHORT - || type == DSCODE.INTEGER - || type == DSCODE.LONG - || type == DSCODE.FLOAT - || type == DSCODE.DOUBLE; - } - - private static boolean isString(int type) { - return type == DSCODE.STRING - || type == DSCODE.STRING_BYTES; - } - - ////////////////////////////////////////////////////////////////////////////// - // - // type comparisons - // - ////////////////////////////////////////////////////////////////////////////// - - private static int compareAsString(byte type1, byte[] b1, int o1, byte type2, byte[] b2, int o2) { - // TODO these comparisons do not provide true alphabetical collation - // support (for example upper case sort before lower case). Need to use a - // collation key instead of unicode ordinal number comparison - switch (type1 << 8 | type2) { - case STRING_TO_STRING: - return compareAsStringOfUtf(b1, o1, b2, o2); - - case STRING_TO_STRING_BYTES: - return -compareAsStringOfByteToUtf(b2, o2, b1, o1); - - case STRING_BYTES_TO_STRING: - return compareAsStringOfByteToUtf(b1, o1, b2, o2); - - case STRING_BYTES_TO_STRING_BYTES: - return compareAsStringOfByte(b1, o1, b2, o2); - - default: - throw new ClassCastException(String.format("Incomparable types: %d %d", type1, type2)); - } - } - - private static int compareAsNumeric(byte type1, byte[] b1, int o1, byte type2, byte[] b2, int o2) { - switch (type1 << 8 | type2) { - case BYTE_TO_BYTE: return compareAsShort (getByte (b1, o1), getByte (b2, o2)); - case BYTE_TO_SHORT: return compareAsShort (getByte (b1, o1), getShort (b2, o2)); - case BYTE_TO_INT: return compareAsInt (getByte (b1, o1), getInt (b2, o2)); - case BYTE_TO_LONG: return compareAsLong (getByte (b1, o1), getLong (b2, o2)); - case BYTE_TO_FLOAT: return compareAsFloat (getByte (b1, o1), getFloat (b2, o2)); - case BYTE_TO_DOUBLE: return compareAsDouble(getByte (b1, o1), getDouble(b2, o2)); - - case SHORT_TO_BYTE: return compareAsShort (getShort (b1, o1), getByte (b2, o2)); - case SHORT_TO_SHORT: return compareAsShort (getShort (b1, o1), getShort (b2, o2)); - case SHORT_TO_INT: return compareAsInt (getShort (b1, o1), getInt (b2, o2)); - case SHORT_TO_LONG: return compareAsLong (getShort (b1, o1), getLong (b2, o2)); - case SHORT_TO_FLOAT: return compareAsFloat (getShort (b1, o1), getFloat (b2, o2)); - case SHORT_TO_DOUBLE: return compareAsDouble(getShort (b1, o1), getDouble(b2, o2)); - - case INT_TO_BYTE: return compareAsInt (getInt (b1, o1), getByte (b2, o2)); - case INT_TO_SHORT: return compareAsInt (getInt (b1, o1), getShort (b2, o2)); - case INT_TO_INT: return compareAsInt (getInt (b1, o1), getInt (b2, o2)); - case INT_TO_LONG: return compareAsLong (getInt (b1, o1), getLong (b2, o2)); - case INT_TO_FLOAT: return compareAsFloat (getInt (b1, o1), getFloat (b2, o2)); - case INT_TO_DOUBLE: return compareAsDouble(getInt (b1, o1), getDouble(b2, o2)); - - case LONG_TO_BYTE: return compareAsLong (getLong (b1, o1), getByte (b2, o2)); - case LONG_TO_SHORT: return compareAsLong (getLong (b1, o1), getShort (b2, o2)); - case LONG_TO_INT: return compareAsLong (getLong (b1, o1), getInt (b2, o2)); - case LONG_TO_LONG: return compareAsLong (getLong (b1, o1), getLong (b2, o2)); - case LONG_TO_FLOAT: return compareAsDouble(getLong (b1, o1), getFloat (b2, o2)); - case LONG_TO_DOUBLE: return compareAsDouble(getLong (b1, o1), getDouble(b2, o2)); - - case FLOAT_TO_BYTE: return compareAsFloat (getFloat (b1, o1), getByte (b2, o2)); - case FLOAT_TO_SHORT: return compareAsFloat (getFloat (b1, o1), getShort (b2, o2)); - case FLOAT_TO_INT: return compareAsFloat (getFloat (b1, o1), getInt (b2, o2)); - case FLOAT_TO_LONG: return compareAsFloat (getFloat (b1, o1), getLong (b2, o2)); - case FLOAT_TO_FLOAT: return compareAsFloat (getFloat (b1, o1), getFloat (b2, o2)); - case FLOAT_TO_DOUBLE: return compareAsDouble(getFloat (b1, o1), getDouble(b2, o2)); - - case DOUBLE_TO_BYTE: return compareAsDouble(getDouble(b1, o1), getByte (b2, o2)); - case DOUBLE_TO_SHORT: return compareAsDouble(getDouble(b1, o1), getShort (b2, o2)); - case DOUBLE_TO_INT: return compareAsDouble(getDouble(b1, o1), getInt (b2, o2)); - case DOUBLE_TO_LONG: return compareAsDouble(getDouble(b1, o1), getLong (b2, o2)); - case DOUBLE_TO_FLOAT: return compareAsDouble(getDouble(b1, o1), getFloat (b2, o2)); - case DOUBLE_TO_DOUBLE: return compareAsDouble(getDouble(b1, o1), getDouble(b2, o2)); - - default: - throw new ClassCastException(String.format("Incomparable types: %d %d", type1, type2)); - } - } - - private static int compareAsBoolean(boolean b1, boolean b2) { - return (b1 == b2) ? 0 : (b1 ? 1 : -1); - } - - private static int compareAsShort(short s1, short s2) { - return s1 - s2; - } - - private static int compareAsChar(char c1, char c2) { - // TODO non-collating sort - return c1 - c2; - } - - private static int compareAsInt(long l1, long l2) { - return (int) (l1 - l2); - } - - private static int compareAsLong(long l1, long l2) { - return (l1 < l2) ? -1 : ((l1 == l2) ? 0 : 1); - } - - private static int compareAsFloat(float f1, float f2) { - return Float.compare(f1, f2); - } - - private static int compareAsDouble(double d1, double d2) { - return Double.compare(d1, d2); - } - - private static int compareAsStringOfByte(byte[] b1, int o1, byte[] b2, int o2) { - int offset = 3; - int l1 = Bytes.toUnsignedShort(b1[o1 + 1], b1[o1 + 2]); - int l2 = Bytes.toUnsignedShort(b2[o1 + 1], b2[o1 + 2]); - - assert b1.length >= o1 + offset + l1; - assert b2.length >= o2 + offset + l2; - - int end = o1 + offset + Math.min(l1, l2); - for (int i = o1 + offset, j = o2 + offset; i < end; i++, j++) { - int diff = b1[i] - b2[j]; - if (diff != 0) { - return diff; - } - } - return l1 - l2; - } - - private static int compareAsStringOfUtf(byte[] b1, int o1, byte[] b2, int o2) { - int offset = 3; - int l1 = Bytes.toUnsignedShort(b1[o1 + 1], b1[o1 + 2]); - int l2 = Bytes.toUnsignedShort(b2[o1 + 1], b2[o1 + 2]); - - assert b1.length >= o1 + offset + l1; - assert b2.length >= o2 + offset + l2; - - int i = 0; - int j = 0; - while (i < l1 && j < l2) { - final int idx = o1 + offset + i; - final int ilen = getUtfLength(b1[idx]); - final char c1 = getUtfChar(b1, idx, ilen); - i += ilen; - - final int jdx = o2 + offset + j; - final int jlen = getUtfLength(b2[jdx]); - char c2 = getUtfChar(b2, jdx, jlen); - j += jlen; - - int diff = compareAsChar(c1, c2); - if (diff != 0) { - return diff; - } - } - return (l1 - i) - (l2 - j); - } - - private static int compareAsStringOfByteToUtf(byte[] b1, int o1, byte[] b2, int o2) { - int offset = 3; - int l1 = Bytes.toUnsignedShort(b1[o1 + 1], b1[o1 + 2]); - int l2 = Bytes.toUnsignedShort(b2[o1 + 1], b2[o1 + 2]); - - assert b1.length >= o1 + offset + l1; - assert b2.length >= o2 + offset + l2; - - int i = 0; - int j = 0; - while (i < l1 && j < l2) { - final int idx = o1 + offset + i; - final char c1 = (char) b1[idx]; - i++; - - final int jdx = o2 + offset + j; - final int jlen = getUtfLength(b2[jdx]); - char c2 = getUtfChar(b2, jdx, jlen); - j += jlen; - - int diff = compareAsChar(c1, c2); - if (diff != 0) { - return diff; - } - } - return (l1 - i) - (l2 - j); - } - - private static int compareAsObject(byte[] b1, int o1, int l1, byte[] b2, int o2, int l2) { - DataInput in1 = new DataInputStream(new ByteArrayInputStream(b1, o1, l1)); - DataInput in2 = new DataInputStream(new ByteArrayInputStream(b2, o2, l2)); - - try { - Comparable<Object> obj1 = DataSerializer.readObject(in1); - Comparable<Object> obj2 = DataSerializer.readObject(in2); - - return obj1.compareTo(obj2); - - } catch (Exception e) { - throw (RuntimeException) new ClassCastException().initCause(e); - } - } - - ////////////////////////////////////////////////////////////////////////////// - // - // - // Get a char from modified UTF8, as defined by DataInput.readUTF(). - // - ////////////////////////////////////////////////////////////////////////////// - - private static int getUtfLength(byte b) { - int c = b & 0xff; - - // 0xxxxxxx - if (c < 0x80) { - return 1; - - // 110xxxxx 10xxxxxx - } else if (c < 0xe0) { - return 2; - } - - // 1110xxxx 10xxxxxx 10xxxxxx - return 3; - } - - private static char getUtfChar(byte[] b, int off, int len) { - assert b.length >= off + len; - switch (len) { - case 1: - return (char) b[off]; - case 2: - return getUtf2(b, off); - case 3: - default: - return getUtf3(b, off); - } - } - - private static char getUtf2(byte[] b, int off) { - assert b.length >= off + 2; - assert (b[off] & 0xff) >= 0xc0; - assert (b[off] & 0xff) < 0xe0; - assert (b[off + 1] & 0xff) >= 0x80; - - return (char) (((b[off] & 0x1f) << 6) | (b[off + 1] & 0x3f)); - } - - private static char getUtf3(byte[] b, int off) { - assert b.length >= off + 3; - assert (b[off] & 0xff) >= 0xe0; - assert (b[off + 1] & 0xff) >= 0x80; - assert (b[off + 2] & 0xff) >= 0x80; - - return (char) (((b[off] & 0x0f) << 12) | ((b[off + 1] & 0x3f) << 6) | (b[off + 2] & 0x3f)); - } - - - ////////////////////////////////////////////////////////////////////////////// - // - // Get a serialized primitive from byte[]; b[0] is the DSCODE. - // - ////////////////////////////////////////////////////////////////////////////// - - private static boolean getBoolean(byte[] b, int off) { - assert b.length >= off + 2; - return b[off + 1] != 0; - } - - private static byte getByte(byte[] b, int off) { - assert b.length >= off + 2; - return b[off + 1]; - } - - private static short getShort(byte[] b, int off) { - assert b.length >= off + 3; - return Bytes.toShort(b[off + 1], b[off + 2]); - } - - private static char getChar(byte[] b, int off) { - assert b.length >= off + 3; - return Bytes.toChar(b[off + 1], b[off + 2]); - } - - private static int getInt(byte[] b, int off) { - assert b.length >= off + 5; - return Bytes.toInt(b[off + 1], b[off + 2], b[off + 3], b[off + 4]); - } - - private static long getLong(byte[] b, int off) { - assert b.length >= off + 9; - return Bytes.toLong(b[off + 1], b[off + 2], b[off + 3], b[off + 4], - b[off + 5], b[off + 6], b[off + 7], b[off + 8]); - } - - private static float getFloat(byte[] b, int off) { - assert b.length >= off + 5; - return Float.intBitsToFloat(getInt(b, off)); - } - - private static double getDouble(byte[] b, int off) { - assert b.length >= off + 9; - return Double.longBitsToDouble(getLong(b, off)); - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f95eb683/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/NonCompactor.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/NonCompactor.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/NonCompactor.java deleted file mode 100644 index 697ac18..0000000 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/NonCompactor.java +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.internal.cache.persistence.soplog; - -import java.io.File; -import java.io.IOException; -import java.util.ArrayDeque; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Deque; -import java.util.SortedMap; -import java.util.TreeMap; -import java.util.concurrent.atomic.AtomicLong; - -import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplog.SortedOplogReader; - -/** - * Provides a compactor that does no compaction, primarily for testing purposes. - * - * @author bakera - */ -public class NonCompactor implements Compactor { - /** the fileset */ - private final Fileset<Integer> fileset; - - /** the current readers */ - private final Deque<TrackedReference<SortedOplogReader>> readers; - - public static Fileset<Integer> createFileset(final String name, final File dir) { - return new Fileset<Integer>() { - private final AtomicLong file = new AtomicLong(0); - - @Override - public SortedMap<Integer, ? extends Iterable<File>> recover() { - return new TreeMap<Integer, Iterable<File>>(); - } - - @Override - public File getNextFilename() { - return new File(dir, name + "-" + System.currentTimeMillis() + "-" - + file.getAndIncrement() + ".soplog"); - } - }; - } - public NonCompactor(String name, File dir) { - fileset = createFileset(name, dir); - readers = new ArrayDeque<TrackedReference<SortedOplogReader>>(); - } - - @Override - public boolean compact() throws IOException { - // liar! - return true; - } - - @Override - public void compact(boolean force, CompactionHandler cd) { - } - - @Override - public synchronized Collection<TrackedReference<SortedOplogReader>> getActiveReaders( - byte[] start, byte[] end) { - for (TrackedReference<SortedOplogReader> tr : readers) { - tr.increment(); - } - return new ArrayList<TrackedReference<SortedOplogReader>>(readers); - } - - @Override - public void add(SortedOplog soplog) throws IOException { - readers.addFirst(new TrackedReference<SortedOplogReader>(soplog.createReader())); - } - - @Override - public synchronized void clear() throws IOException { - for (TrackedReference<SortedOplogReader> tr : readers) { - tr.get().close(); - readers.remove(tr); - } - } - - @Override - public synchronized void close() throws IOException { - clear(); - } - - @Override - public CompactionTracker<Integer> getTracker() { - return null; - } - - @Override - public Fileset<Integer> getFileset() { - return fileset; - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f95eb683/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/ReversingSerializedComparator.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/ReversingSerializedComparator.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/ReversingSerializedComparator.java deleted file mode 100644 index b18919d..0000000 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/ReversingSerializedComparator.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.internal.cache.persistence.soplog; - -import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedReader.SerializedComparator; - -/** - * Reverses the ordering imposed by the underlying comparator. Use this to - * change from an ascending to a descending order or vice versa. - * <p> - * Prior to use, an instance must be configured with a comparator for delegation - * of the comparison operations. - * - * @author bakera - */ -public class ReversingSerializedComparator implements DelegatingSerializedComparator { - private volatile SerializedComparator delegate; - - @Override - public void setComparators(SerializedComparator[] sc) { - assert sc.length == 0; - delegate = sc[0]; - } - - @Override - public SerializedComparator[] getComparators() { - return new SerializedComparator[] { delegate }; - } - - @Override - public int compare(byte[] o1, byte[] o2) { - return compare(o1, 0, o1.length, o2, 0, o2.length); - } - - @Override - public int compare(byte[] b1, int o1, int l1, byte[] b2, int o2, int l2) { - return delegate.compare(b2, o2, l2, b1, o1, l1); - } - - /** - * Returns a comparator that reverses the ordering imposed by the supplied - * comparator. - * - * @param sc the original comparator - * @return the reversed comparator - */ - public static SerializedComparator reverse(SerializedComparator sc) { - ReversingSerializedComparator rev = new ReversingSerializedComparator(); - rev.delegate = sc; - - return rev; - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/f95eb683/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SizeTieredCompactor.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SizeTieredCompactor.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SizeTieredCompactor.java deleted file mode 100644 index 5976ad0..0000000 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SizeTieredCompactor.java +++ /dev/null @@ -1,198 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.internal.cache.persistence.soplog; - -import java.io.File; -import java.io.IOException; -import java.io.InterruptedIOException; -import java.util.ArrayDeque; -import java.util.ArrayList; -import java.util.Deque; -import java.util.List; -import java.util.Map; -import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicBoolean; - -import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplog.SortedOplogReader; - -/** - * Implements a size-tiered compaction scheme in which the soplogs are organized - * by levels of increasing size. Each level is limited to a fixed number of - * files, <code>M</code>. Given an initial size of <code>N</code> the amount of - * disk space consumed by a level <code>L</code> is <code>M * N^(L+1)</code>. - * <p> - * During compaction, this approach will temporarily double the amount of space - * consumed by the level. Compactions are performed on a background thread. - * <p> - * Soplogs that have been compacted will be moved to the inactive list where they - * will be deleted once they are no longer in use. - * - * @author bakera - */ -public class SizeTieredCompactor extends AbstractCompactor<Integer> { - /** restricts the number of soplogs per level */ - private final int maxFilesPerLevel; - - // TODO consider relaxing the upper bound so the levels are created dynamically - /** restricts the number of levels; files in maxLevel are not compacted */ - private final int maxLevels; - - public SizeTieredCompactor(SortedOplogFactory factory, - Fileset<Integer> fileset, CompactionTracker<Integer> tracker, - Executor exec, int maxFilesPerLevel, int maxLevels) - throws IOException { - super(factory, fileset, tracker, exec); - - assert maxFilesPerLevel > 0; - assert maxLevels > 0; - - this.maxFilesPerLevel = maxFilesPerLevel; - this.maxLevels = maxLevels; - - final boolean isDebugEnabled = logger.isDebugEnabled(); - if (isDebugEnabled) { - logger.debug("{}Creating size-tiered compactor", super.logPrefix); - } - - for (int i = 0; i < maxLevels; i++) { - levels.add(new OrderedLevel(i)); - } - - for (Map.Entry<Integer, ? extends Iterable<File>> entry : fileset.recover().entrySet()) { - int level = Math.min(maxLevels - 1, entry.getKey()); - for (File f : entry.getValue()) { - if (isDebugEnabled) { - logger.debug("{}Adding {} to level {}", super.logPrefix, f, level); - } - levels.get(level).add(factory.createSortedOplog(f)); - } - } - } - - @Override - public String toString() { - return String.format("%s <%d/%d>", factory.getConfiguration().getName(), maxFilesPerLevel, maxLevels); - } - - /** - * Organizes a set of soplogs for a given level. All operations on the - * soplogs are synchronized via the instance monitor. - */ - protected class OrderedLevel extends Level { - /** the ordered set of soplog readers */ - private final Deque<TrackedReference<SortedOplogReader>> soplogs; - - /** true if the level is being compacted */ - private final AtomicBoolean isCompacting; - - public OrderedLevel(int level) { - super(level); - soplogs = new ArrayDeque<TrackedReference<SortedOplogReader>>(maxFilesPerLevel); - isCompacting = new AtomicBoolean(false); - } - - @Override - protected synchronized boolean needsCompaction() { - // TODO this is safe but overly conservative...we need to allow parallel - // compaction of a level such that we guarantee completion order and handle - // errors - return !isCompacting.get() - && soplogs.size() >= maxFilesPerLevel - && level != maxLevels - 1; - } - - @Override - protected List<TrackedReference<SortedOplogReader>> getSnapshot(byte[] start, byte[] end) { - // ignoring range limits since keys are stored in overlapping files - List<TrackedReference<SortedOplogReader>> snap; - synchronized (this) { - snap = new ArrayList<TrackedReference<SortedOplogReader>>(soplogs); - } - - for (TrackedReference<SortedOplogReader> tr : snap) { - tr.increment(); - } - return snap; - } - - @Override - protected synchronized void clear() throws IOException { - for (TrackedReference<SortedOplogReader> tr : soplogs) { - tr.get().close(); - } - markAsInactive(soplogs, level); - soplogs.clear(); - } - - @Override - protected synchronized void close() throws IOException { - for (TrackedReference<SortedOplogReader> tr : soplogs) { - tr.get().close(); - factory.getConfiguration().getStatistics().incActiveFiles(-1); - } - soplogs.clear(); - } - - @Override - protected void add(SortedOplog soplog) throws IOException { - SortedOplogReader rdr = soplog.createReader(); - synchronized (this) { - soplogs.addFirst(new TrackedReference<SortedOplogReader>(rdr)); - } - - if (logger.isDebugEnabled()) { - logger.debug("{}Added file {} to level {}", SizeTieredCompactor.super.logPrefix, rdr, level); - } - tracker.fileAdded(rdr.getFile(), level); - factory.getConfiguration().getStatistics().incActiveFiles(1); - } - - @Override - protected boolean compact(AtomicBoolean aborted) throws IOException { - assert level < maxLevels : "Can't compact level: " + level; - - if (!isCompacting.compareAndSet(false, true)) { - // another thread won so gracefully bow out - return false; - } - - try { - List<TrackedReference<SortedOplogReader>> snapshot = getSnapshot(null, null); - try { - SortedOplog merged = merge(snapshot, level == maxLevels - 1, aborted); - - synchronized (this) { - if (merged != null) { - levels.get(Math.min(level + 1, maxLevels - 1)).add(merged); - } - markAsInactive(snapshot, level); - soplogs.removeAll(snapshot); - } - } catch (InterruptedIOException e) { - if (logger.isDebugEnabled()) { - logger.debug("{}Aborting compaction of level {}", SizeTieredCompactor.super.logPrefix, level); - } - return false; - } - return true; - } finally { - boolean set = isCompacting.compareAndSet(true, false); - assert set; - } - } - } -}
