http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9438c8b1/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SoplogToken.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SoplogToken.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SoplogToken.java deleted file mode 100644 index 73175e7..0000000 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SoplogToken.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.internal.cache.persistence.soplog; - -import com.gemstone.gemfire.internal.DSCODE; -import com.gemstone.gemfire.internal.cache.EntryBits; - -/** - * Defines serialized tokens for soplogs. - */ -public enum SoplogToken { - - /** indicates the serialized value is a wildcard compares equal to any other key */ - WILDCARD( DSCODE.WILDCARD ), - - /** indicates the serialized value is a tombstone of a deleted key */ - TOMBSTONE( EntryBits.setTombstone((byte)0, true) ), - - /** indicates the serialized value is a invalid token*/ - INVALID( EntryBits.setInvalid((byte)0, true) ), - - /** indicates the serialized tombstone has been garbage collected*/ - REMOVED_PHASE2( EntryBits.setLocalInvalid((byte)0, true) ), - - /** indicates the value is serialized */ - SERIALIZED( EntryBits.setSerialized((byte)0, true) ); - - /** the serialized form of the token */ - private final byte val; - - private SoplogToken(byte val) { - this.val = val; - } - - @Override - public String toString() { - return super.toString()+" byte:"+val; - } - - /** - * Returns the serialized form of the token. - * @return the byte - */ - public byte toByte() { - return val; - } - - /** - * Returns true if either of the serialized objects is a wildcard. - * - * @param b1 the first object - * @param off1 the first offset - * @param b2 the second object - * @param off2 the second object - * @return true if a wildcard - */ - public static boolean isWildcard(byte[] b1, int off1, byte[] b2, int off2) { - return b1[off1] == DSCODE.WILDCARD || b2[off2] == DSCODE.WILDCARD; - } - - /** - * Returns true if the serialized object is a tombstone. - * - * @param b the magic entry type byte - * @return true if a tombstone - */ - public static boolean isTombstone(byte b) { - return EntryBits.isTombstone(b); - } - - /** - * Returns true if the serialized object is an invalid token. - * - * @param b the magic entry type byte - * @return true if invalid - */ - public static boolean isInvalid(byte b) { - return EntryBits.isInvalid(b); - } - - /** - * Returns true if the serialized tombstone was garbage collected - * - * @param b the magic entry type byte - * @return true if RemovedPhase2 - */ - public static boolean isRemovedPhase2(byte b) { - return EntryBits.isLocalInvalid(b); - } - - /** - * Returns true if the serialized object is not any token - * - *@param b the magic entry type byte - * @return true if not any token - */ - public static boolean isSerialized(byte b) { - return EntryBits.isSerialized(b); - } -} - -
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9438c8b1/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedBuffer.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedBuffer.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedBuffer.java deleted file mode 100644 index b301ac5..0000000 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedBuffer.java +++ /dev/null @@ -1,367 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.internal.cache.persistence.soplog; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.Collections; -import java.util.EnumMap; -import java.util.Iterator; -import java.util.Map.Entry; -import java.util.NavigableMap; -import java.util.concurrent.ConcurrentSkipListMap; - -import org.apache.logging.log4j.Logger; - -import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplogFactory.SortedOplogConfiguration; -import com.gemstone.gemfire.internal.logging.LogService; - -/** - * Provides an in-memory buffer to temporarily hold key/value pairs until they - * can be flushed to disk. Each buffer instance can be optionally associated - * with a user-specified tag for identification purposes. - * - * @param <T> the tag type - * @author bakera - */ -public class SortedBuffer<T> extends AbstractSortedReader { - private static final Logger logger = LogService.getLogger(); - - /** the tag */ - private final T tag; - - /** in-memory sorted key/vaue buffer */ - private final NavigableMap<byte[], byte[]> buffer; - - /** the stats */ - private final BufferStats stats; - - /** the metadata, set during flush */ - private final EnumMap<Metadata, byte[]> metadata; - - /** the command to run (or defer) when the flush is complete */ - private Runnable flushAction; - - private final String logPrefix; - - public SortedBuffer(SortedOplogConfiguration config, T tag) { - assert config != null; - assert tag != null; - - this.tag = tag; - - buffer = new ConcurrentSkipListMap<byte[], byte[]>(config.getComparator()); - stats = new BufferStats(); - metadata = new EnumMap<Metadata, byte[]>(Metadata.class); - - this.logPrefix = "<" + config.getName() + "#" + tag + "> "; - } - - /** - * Returns the tag associated with the buffer. - * @return the tag - */ - public T getTag() { - return tag; - } - - @Override - public String toString() { - return logger.getName() + this.logPrefix; - } - - /** - * Adds a new value to the buffer. - * @param key the key - * @param value the value - */ - public void put(byte[] key, byte[] value) { - if (buffer.put(key, value) == null) { - // ASSUMPTION: updates don't significantly change the value length - // this lets us optimize statistics calculations - stats.add(key.length, value.length); - } - } - - /** - * Allows sorted iteration over the buffer contents. - * @return the buffer entries - */ - public Iterable<Entry<byte[], byte[]>> entries() { - return buffer.entrySet(); - } - - /** - * Returns the number of entries in the buffer. - * @return the count - */ - public int count() { - return buffer.size(); - } - - /** - * Returns the size of the data in bytes. - * @return the data size - */ - public long dataSize() { - return stats.totalSize(); - } - - /** - * Clears the buffer of all entries. - */ - public void clear() { - if (logger.isDebugEnabled()) { - logger.debug("{}Clearing buffer", this.logPrefix); - } - - buffer.clear(); - stats.clear(); - metadata.clear(); - - synchronized (this) { - flushAction = null; - } - } - - /** - * Returns true if the flush completion has been deferred. - * @return true if deferred - */ - public synchronized boolean isDeferred() { - return flushAction != null; - } - - /** - * Defers the flush completion to a later time. This is used to ensure correct - * ordering of soplogs during parallel flushes. - * - * @param action the action to perform when ready - */ - public synchronized void defer(Runnable action) { - assert flushAction == null; - - if (logger.isDebugEnabled()) { - logger.debug("{}Deferring flush completion", this.logPrefix); - } - flushAction = action; - } - - /** - * Completes the deferred flush operation. - */ - public synchronized void complete() { - assert flushAction != null; - - try { - if (logger.isDebugEnabled()) { - logger.debug("{}Completing deferred flush operation", this.logPrefix); - } - flushAction.run(); - - } finally { - flushAction = null; - } - } - - /** - * Returns the buffer metadata. - * @return the metadata - */ - public synchronized EnumMap<Metadata, byte[]> getMetadata() { - return metadata; - } - - /** - * Returns the metadata value for the given key. - * - * @param name the metadata name - * @return the requested metadata - */ - public synchronized byte[] getMetadata(Metadata name) { - return metadata.get(name); - } - - /** - * Sets the metadata for the buffer. This is not available until the buffer - * is about to be flushed. - * - * @param metadata the metadata - */ - public synchronized void setMetadata(EnumMap<Metadata, byte[]> metadata) { - if (metadata != null) { - this.metadata.putAll(metadata); - } - } - - @Override - public boolean mightContain(byte[] key) { - return true; - } - - @Override - public ByteBuffer read(byte[] key) throws IOException { - byte[] val = buffer.get(key); - if (val != null) { - return ByteBuffer.wrap(val); - } - return null; - } - - @Override - public SortedIterator<ByteBuffer> scan( - byte[] from, boolean fromInclusive, - byte[] to, boolean toInclusive, - boolean ascending, - MetadataFilter filter) { - - if (filter == null || filter.accept(metadata.get(filter.getName()))) { - NavigableMap<byte[],byte[]> subset = ascending ? buffer : buffer.descendingMap(); - if (from == null && to == null) { - // we're good - } else if (from == null) { - subset = subset.headMap(to, toInclusive); - } else if (to == null) { - subset = subset.tailMap(from, fromInclusive); - } else { - subset = subset.subMap(from, fromInclusive, to, toInclusive); - } - return new BufferIterator(subset.entrySet().iterator()); - } - return new BufferIterator(Collections.<byte[], byte[]>emptyMap().entrySet().iterator()); - } - - @Override - public SerializedComparator getComparator() { - return (SerializedComparator) buffer.comparator(); - } - - @Override - public SortedStatistics getStatistics() { - return stats; - } - - @Override - public void close() throws IOException { - if (logger.isDebugEnabled()) { - logger.debug("{}Closing buffer", this.logPrefix); - } - - synchronized (this) { - flushAction = null; - } - } - - /** - * Allows sorted iteration over the buffer contents. - */ - public static class BufferIterator - extends AbstractKeyValueIterator<ByteBuffer, ByteBuffer> - implements SortedIterator<ByteBuffer> - { - /** the backing iterator */ - private final Iterator<Entry<byte[], byte[]>> entries; - - /** the iteration cursor */ - private Entry<byte[], byte[]> current; - - public BufferIterator(Iterator<Entry<byte[], byte[]>> iterator) { - this.entries = iterator; - } - - @Override - public ByteBuffer key() { - return ByteBuffer.wrap(current.getKey()); - } - - @Override - public ByteBuffer value() { - return ByteBuffer.wrap(current.getValue()); - } - - @Override - public void close() { - } - - @Override - protected boolean step() { - return (current = entries.hasNext() ? entries.next() : null) != null; - } - } - - private class BufferStats implements SortedStatistics { - /** data size */ - private long totalSize; - - /** key count */ - private long keys; - - /** avg key size */ - private double avgKeySize; - - /** avg value size */ - private double avgValueSize; - - private synchronized void clear() { - totalSize = 0; - keys = 0; - avgKeySize = 0; - avgValueSize = 0; - } - - private synchronized void add(int keyLength, int valueLength) { - totalSize += keyLength + valueLength; - avgKeySize = (keyLength + keys * avgKeySize) / (keys + 1); - avgValueSize = (keyLength + keys * avgValueSize) / (keys + 1); - - keys++; - } - - @Override - public synchronized long keyCount() { - return keys; - } - - @Override - public byte[] firstKey() { - return buffer.firstKey(); - } - - @Override - public byte[] lastKey() { - return buffer.lastKey(); - } - - @Override - public synchronized double avgKeySize() { - return avgKeySize; - } - - @Override - public synchronized double avgValueSize() { - return avgValueSize; - } - - @Override - public void close() { - } - - public synchronized long totalSize() { - return totalSize; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9438c8b1/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedOplog.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedOplog.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedOplog.java deleted file mode 100644 index 95fb411..0000000 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedOplog.java +++ /dev/null @@ -1,158 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.internal.cache.persistence.soplog; - -import java.io.File; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.EnumMap; - -import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedReader.Metadata; - -/** - * Defines the API for reading and writing sorted key/value pairs. The keys - * are expected to be lexicographically comparable {@code byte[]} arrays. - * - * @author bakera - */ -public interface SortedOplog { - /** - * Checks if a key may be present in a set. - */ - public interface BloomFilter { - /** - * Returns true if the bloom filter might contain the supplied key. The - * nature of the bloom filter is such that false positives are allowed, but - * false negatives cannot occur. - * - * @param key the key to test - * @return true if the key might be present - */ - boolean mightContain(byte[] key); - } - - /** - * Reads key/value pairs from the sorted file. - */ - public interface SortedOplogReader extends SortedReader<ByteBuffer> { - /** - * Returns the bloom filter associated with this reader. - * @return the bloom filter - */ - BloomFilter getBloomFilter(); - - /** - * Returns the metadata value for the given key. - * - * @param name the metadata name - * @return the requested metadata - * @throws IOException error reading metadata - */ - byte[] getMetadata(Metadata name) throws IOException; - - /** - * Returns the file used to persist the soplog contents. - * @return the file - */ - File getFile(); - - /** - * @return file name - */ - String getFileName(); - - /** - * renames the file to the input name - * - * @throws IOException - */ - void rename(String name) throws IOException; - - /** - * @return the modification timestamp of the file - * @throws IOException - */ - long getModificationTimeStamp() throws IOException; - - /** - * Deletes the sorted oplog file - */ - public void delete() throws IOException; - - /** - * Returns true if the reader is closed. - * @return true if closed - */ - boolean isClosed(); - } - - /** - * Writes key/value pairs in a sorted manner. Each entry that is appended - * must have a key that is greater than or equal to the previous key. - */ - public interface SortedOplogWriter { - /** - * Appends another key and value. The key is expected to be greater than - * or equal to the last key that was appended. - * - * @param key the key - * @param value the value - * @throws IOException write error - */ - void append(ByteBuffer key, ByteBuffer value) throws IOException; - - /** - * Appends another key and value. The key is expected to be greater than - * or equal to the last key that was appended. - * - * @param key the key - * @param value the value - * @throws IOException write error - */ - void append(byte[] key, byte[] value) throws IOException; - - /** - * Closes the file, first writing optional user and system metadata. - * - * @param metadata the metadata to include - * @throws IOException unable to close file - */ - void close(EnumMap<Metadata, byte[]> metadata) throws IOException; - - /** - * Invoked to close and remove the file to clean up after an error. - * @throws IOException error closing - */ - void closeAndDelete() throws IOException; - } - - /** - * Creates a new sorted reader. - * - * @return the reader - * @throws IOException error creating reader - */ - SortedOplogReader createReader() throws IOException; - - /** - * Creates a new sorted writer. - * - * @return the writer - * @throws IOException error creating writer - */ - SortedOplogWriter createWriter() throws IOException; -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9438c8b1/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedOplogFactory.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedOplogFactory.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedOplogFactory.java deleted file mode 100644 index a470d7e..0000000 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedOplogFactory.java +++ /dev/null @@ -1,278 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.internal.cache.persistence.soplog; - -import java.io.File; -import java.io.IOException; -import java.util.EnumMap; - -import org.apache.hadoop.hbase.io.hfile.BlockCache; - -import com.gemstone.gemfire.internal.cache.persistence.soplog.Compactor.MetadataCompactor; -import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedReader.Metadata; -import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedReader.SerializedComparator; - -/** - * Provides a means to construct a soplog. - */ -public interface SortedOplogFactory { - /** - * Configures a <code>SortedOplog</code>. - * - * @author bakera - */ - public class SortedOplogConfiguration { - /** the default metadata compactor */ - public static MetadataCompactor DEFAULT_METADATA_COMPACTOR = new MetadataCompactor() { - @Override - public byte[] compact(byte[] metadata1, byte[] metadata2) { - return metadata1; - } - }; - - /** - * Defines the available checksum algorithms. - */ - public enum Checksum { - NONE, - CRC32 - } - - /** - * Defines the available compression algorithms. - */ - public enum Compression { - NONE, - } - - /** - * Defines the available key encodings. - */ - public enum KeyEncoding { - NONE, - } - - /** the soplog name */ - private final String name; - - /** the statistics */ - private final SortedOplogStatistics stats; - - private final HFileStoreStatistics storeStats; - - /** true if bloom filters are enabled */ - private boolean bloom; - - /** the soplog block size */ - private int blockSize; - - /** the number of bytes for each checksum */ - private int bytesPerChecksum; - - /** the checksum type */ - private Checksum checksum; - - /** the compression type */ - private Compression compression; - - /** the key encoding type */ - private KeyEncoding keyEncoding; - - /** the comparator */ - private SerializedComparator comparator; - - /** metadata comparers */ - private EnumMap<Metadata, MetadataCompactor> metaCompactors; - - private BlockCache blockCache; - - private boolean cacheDataBlocksOnRead; - - public SortedOplogConfiguration(String name) { - this(name, null, new SortedOplogStatistics("GridDBRegionStatistics", name), new HFileStoreStatistics("GridDBStoreStatistics", name)); - } - - public SortedOplogConfiguration(String name, BlockCache blockCache, SortedOplogStatistics stats, HFileStoreStatistics storeStats) { - this.name = name; - this.stats = stats; - - // defaults - bloom = true; - blockSize = 1 << 16; - bytesPerChecksum = 1 << 14; - checksum = Checksum.NONE; - compression = Compression.NONE; - keyEncoding = KeyEncoding.NONE; - comparator = new ByteComparator(); - this.cacheDataBlocksOnRead = true; - this.storeStats = storeStats; - this.blockCache = blockCache; - } - - public SortedOplogConfiguration setBloomFilterEnabled(boolean enabled) { - this.bloom = enabled; - return this; - } - - public SortedOplogConfiguration setBlockSize(int size) { - this.blockSize = size; - return this; - } - - public SortedOplogConfiguration setBytesPerChecksum(int bytes) { - this.bytesPerChecksum = bytes; - return this; - } - - public SortedOplogConfiguration setChecksum(Checksum type) { - this.checksum = type; - return this; - } - - public SortedOplogConfiguration setCompression(Compression type) { - this.compression = type; - return this; - } - - public SortedOplogConfiguration setKeyEncoding(KeyEncoding type) { - this.keyEncoding = type; - return this; - } - - public SortedOplogConfiguration setComparator(SerializedComparator comp) { - this.comparator = comp; - return this; - } - - public SortedOplogConfiguration addMetadataCompactor(Metadata name, MetadataCompactor compactor) { - metaCompactors.put(name, compactor); - return this; - } - - /** - * Returns the soplog name. - * @return the name - */ - public String getName() { - return name; - } - - /** - * Returns the statistics. - * @return the statistics - */ - public SortedOplogStatistics getStatistics() { - return stats; - } - - public HFileStoreStatistics getStoreStatistics() { - return storeStats; - } - - /** - * Returns true if the bloom filter is enabled. - * @return true if enabled - */ - public boolean isBloomFilterEnabled() { - return bloom; - } - - /** - * Returns the block size in bytes. - * @return the block size - */ - public int getBlockSize() { - return blockSize; - } - - /** - * Returns the number of bytes per checksum. - * @return the bytes - */ - public int getBytesPerChecksum() { - return bytesPerChecksum; - } - - /** - * Returns the checksum type. - * @return the checksum - */ - public Checksum getChecksum() { - return checksum; - } - - /** - * Returns the compression type. - * @return the compression - */ - public Compression getCompression() { - return compression; - } - - /** - * Returns the key encoding type. - * @return the key encoding - */ - public KeyEncoding getKeyEncoding() { - return keyEncoding; - } - - /** - * Returns the comparator. - * @return the comparator - */ - public SerializedComparator getComparator() { - return comparator; - } - - /** - * Returns the metadata compactor for the given name. - * @param name the metadata name - * @return the compactor - */ - public MetadataCompactor getMetadataCompactor(Metadata name) { - MetadataCompactor mc = metaCompactors.get(name); - if (mc != null) { - return mc; - } - return DEFAULT_METADATA_COMPACTOR; - } - - public BlockCache getBlockCache() { - return this.blockCache; - } - - public boolean getCacheDataBlocksOnRead() { - return cacheDataBlocksOnRead ; - } - } - - /** - * Returns the configuration. - * @return the configuration - */ - SortedOplogConfiguration getConfiguration(); - - /** - * Creates a new soplog. - * - * @param name the filename - * @return the soplog - * @throws IOException error creating soplog - */ - SortedOplog createSortedOplog(File name) throws IOException; -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9438c8b1/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedOplogSet.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedOplogSet.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedOplogSet.java deleted file mode 100644 index 2900229..0000000 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedOplogSet.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.internal.cache.persistence.soplog; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.EnumMap; - -/** - * Provides a unified view of the current SBuffer, the unflushed SBuffers, and - * the existing soplogs. - * - * @author bakera - */ -public interface SortedOplogSet extends SortedReader<ByteBuffer> { - /** - * Defines a callback handler for asynchronous operations. - */ - public interface FlushHandler { - /** - * Invoked when the operation completed successfully. - */ - void complete(); - - /** - * Invoked when the operation completed with an error. - * @param t the error - */ - void error(Throwable t); - } - - /** - * Inserts or updates an entry in the current buffer. This invocation may - * block if the current buffer is full and there are too many outstanding - * write requests. - * - * @param key the key - * @param value the value - * @throws IOException - */ - void put(byte[] key, byte[] value) throws IOException; - - /** - * Returns the size of the current buffer in bytes. - * @return the buffer size - */ - long bufferSize(); - - /** - * Returns the size of the unflushed buffers in bytes. - * @return the unflushed size - */ - long unflushedSize(); - - /** - * Requests that the current buffer be flushed to disk. This invocation may - * block if there are too many outstanding write requests. - * - * @param metadata supplemental data to be included in the soplog - * @param handler the flush completion callback - * @throws IOException error preparing flush - */ - void flush(EnumMap<Metadata, byte[]> metadata, FlushHandler handler) throws IOException; - - /** - * Flushes the current buffer and closes the soplog set. Blocks until the flush - * is completed. - * - * @param metadata supplemental data to be included in the soplog - * @throws IOException error during flush - */ - void flushAndClose(EnumMap<Metadata, byte[]> metadata) throws IOException; - - /** - * Returns the configured compaction strategy. - * @return the compactor - */ - Compactor getCompactor(); - - /** - * Clears the current buffer, any existing buffers, and all active soplogs. - * - * @throws IOException unable to clear - */ - void clear() throws IOException; - - /** - * Clears existing and closes the soplog set. - * @throws IOException unable to destroy - */ - void destroy() throws IOException; - - /** - * Returns true if the set is closed. - * @return true if closed - */ - boolean isClosed(); - - /** - * Returns the soplog factory. - * @return the factory - */ - SortedOplogFactory getFactory(); -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9438c8b1/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedOplogSetImpl.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedOplogSetImpl.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedOplogSetImpl.java deleted file mode 100644 index 2cf1191..0000000 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/SortedOplogSetImpl.java +++ /dev/null @@ -1,780 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.internal.cache.persistence.soplog; - -import java.io.File; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayDeque; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Deque; -import java.util.EnumMap; -import java.util.List; -import java.util.Map.Entry; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -import org.apache.logging.log4j.Logger; - -import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplog.SortedOplogReader; -import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplog.SortedOplogWriter; -import com.gemstone.gemfire.internal.logging.LogService; -import com.gemstone.gemfire.internal.util.AbortableTaskService; -import com.gemstone.gemfire.internal.util.AbortableTaskService.AbortableTask; - -/** - * Provides a unifies view across a set of sbuffers and soplogs. Updates are - * made into the current sbuffer. When requested, the current sbuffer will be - * flushed and subsequent updates will flow into a new sbuffer. All flushes are - * done on a background thread. - * - * @author bakera - */ -public class SortedOplogSetImpl extends AbstractSortedReader implements SortedOplogSet { - private static final Logger logger = LogService.getLogger(); - - /** creates new soplogs */ - private final SortedOplogFactory factory; - - /** the background flush thread pool */ - private final AbortableTaskService flusher; - - /** the compactor */ - private final Compactor compactor; - - /** the current sbuffer */ - private final AtomicReference<SortedBuffer<Integer>> current; - - /** the buffer count */ - private final AtomicInteger bufferCount; - - /** the unflushed sbuffers */ - private final Deque<SortedBuffer<Integer>> unflushed; - - /** the lock for access to unflushed and soplogs */ - private final ReadWriteLock rwlock; - - /** test hook for clear/close/destroy during flush */ - volatile CountDownLatch testDelayDuringFlush; - - /** test hook to cause IOException during flush */ - volatile boolean testErrorDuringFlush; - - private final String logPrefix; - - public SortedOplogSetImpl(final SortedOplogFactory factory, Executor exec, Compactor ctor) throws IOException { - this.factory = factory; - this.flusher = new AbortableTaskService(exec); - this.compactor = ctor; - - rwlock = new ReentrantReadWriteLock(); - bufferCount = new AtomicInteger(0); - unflushed = new ArrayDeque<SortedBuffer<Integer>>(); - current = new AtomicReference<SortedBuffer<Integer>>( - new SortedBuffer<Integer>(factory.getConfiguration(), 0)); - - this.logPrefix = "<" + factory.getConfiguration().getName() + "> "; - if (logger.isDebugEnabled()) { - logger.debug("{}Creating soplog set", this.logPrefix); - } - } - - @Override - public boolean mightContain(byte[] key) throws IOException { - // loops through the following readers: - // current sbuffer - // unflushed sbuffers - // soplogs - // - // The loop has been unrolled for efficiency. - // - if (getCurrent().mightContain(key)) { - return true; - } - - // snapshot the sbuffers and soplogs for stable iteration - List<SortedReader<ByteBuffer>> readers; - Collection<TrackedReference<SortedOplogReader>> soplogs; - rwlock.readLock().lock(); - try { - readers = new ArrayList<SortedReader<ByteBuffer>>(unflushed); - soplogs = compactor.getActiveReaders(key, key); - for (TrackedReference<SortedOplogReader> tr : soplogs) { - readers.add(tr.get()); - } - } finally { - rwlock.readLock().unlock(); - } - - try { - for (SortedReader<ByteBuffer> rdr : readers) { - if (rdr.mightContain(key)) { - return true; - } - } - return false; - } finally { - TrackedReference.decrementAll(soplogs); - } - } - - @Override - public ByteBuffer read(byte[] key) throws IOException { - // loops through the following readers: - // current sbuffer - // unflushed sbuffers - // soplogs - // - // The loop has been slightly unrolled for efficiency. - // - ByteBuffer val = getCurrent().read(key); - if (val != null) { - return val; - } - - // snapshot the sbuffers and soplogs for stable iteration - List<SortedReader<ByteBuffer>> readers; - Collection<TrackedReference<SortedOplogReader>> soplogs; - rwlock.readLock().lock(); - try { - readers = new ArrayList<SortedReader<ByteBuffer>>(unflushed); - soplogs = compactor.getActiveReaders(key, key); - for (TrackedReference<SortedOplogReader> tr : soplogs) { - readers.add(tr.get()); - } - } finally { - rwlock.readLock().unlock(); - } - - try { - for (SortedReader<ByteBuffer> rdr : readers) { - if (rdr.mightContain(key)) { - val = rdr.read(key); - if (val != null) { - return val; - } - } - } - return null; - } finally { - TrackedReference.decrementAll(soplogs); - } - } - - @Override - public SortedIterator<ByteBuffer> scan( - byte[] from, boolean fromInclusive, - byte[] to, boolean toInclusive, - boolean ascending, - MetadataFilter filter) throws IOException { - - SerializedComparator sc = factory.getConfiguration().getComparator(); - sc = ascending ? sc : ReversingSerializedComparator.reverse(sc); - - List<SortedIterator<ByteBuffer>> scans = new ArrayList<SortedIterator<ByteBuffer>>(); - Collection<TrackedReference<SortedOplogReader>> soplogs; - rwlock.readLock().lock(); - try { - scans.add(getCurrent().scan(from, fromInclusive, to, toInclusive, ascending, filter)); - for (SortedBuffer<Integer> sb : unflushed) { - scans.add(sb.scan(from, fromInclusive, to, toInclusive, ascending, filter)); - } - soplogs = compactor.getActiveReaders(from, to); - } finally { - rwlock.readLock().unlock(); - } - - for (TrackedReference<SortedOplogReader> tr : soplogs) { - scans.add(tr.get().scan(from, fromInclusive, to, toInclusive, ascending, filter)); - } - return new MergedIterator(sc, soplogs, scans); - } - - @Override - public void put(byte[] key, byte[] value) { - assert key != null; - assert value != null; - - long start = factory.getConfiguration().getStatistics().getPut().begin(); - getCurrent().put(key, value); - factory.getConfiguration().getStatistics().getPut().end(value.length, start); - } - - @Override - public long bufferSize() { - return getCurrent().dataSize(); - } - - @Override - public long unflushedSize() { - long size = 0; - rwlock.readLock().lock(); - try { - for (SortedBuffer<Integer> sb : unflushed) { - size += sb.dataSize(); - } - } finally { - rwlock.readLock().unlock(); - } - return size; - } - - @Override - public void flushAndClose(EnumMap<Metadata, byte[]> metadata) throws IOException { - final AtomicReference<Throwable> err = new AtomicReference<Throwable>(null); - flush(metadata, new FlushHandler() { - @Override public void complete() { } - @Override public void error(Throwable t) { err.set(t); } - }); - - // waits for flush completion - close(); - - Throwable t = err.get(); - if (t != null) { - throw new IOException(t); - } - } - - @Override - public void flush(EnumMap<Metadata, byte[]> metadata, FlushHandler handler) { - assert handler != null; - - long start = factory.getConfiguration().getStatistics().getFlush().begin(); - - // flip to a new buffer - final SortedBuffer<Integer> sb; - rwlock.writeLock().lock(); - try { - if (isClosed()) { - handler.complete(); - factory.getConfiguration().getStatistics().getFlush().end(0, start); - - return; - } - - sb = flipBuffer(); - if (sb.count() == 0) { - if (logger.isDebugEnabled()) { - logger.debug("{}Skipping flush of empty buffer {}", this.logPrefix, sb); - } - handler.complete(); - return; - } - - sb.setMetadata(metadata); - unflushed.addFirst(sb); - - // Note: this is queued while holding the lock to ensure correct ordering - // on the executor queue. Don't use a bounded queue here or we will block - // the flush invoker. - flusher.execute(new FlushTask(handler, sb, start)); - - } finally { - rwlock.writeLock().unlock(); - } - } - - @Override - public void clear() throws IOException { - if (logger.isDebugEnabled()) { - logger.debug("{}Clearing soplog set", this.logPrefix); - } - - long start = factory.getConfiguration().getStatistics().getClear().begin(); - - // acquire lock to ensure consistency with flushes - rwlock.writeLock().lock(); - try { - SortedBuffer<Integer> tmp = current.get(); - if (tmp != null) { - tmp.clear(); - } - - flusher.abortAll(); - for (SortedBuffer<Integer> sb : unflushed) { - sb.clear(); - } - - unflushed.clear(); - compactor.clear(); - - releaseTestDelay(); - flusher.waitForCompletion(); - factory.getConfiguration().getStatistics().getClear().end(start); - - } catch (IOException e) { - factory.getConfiguration().getStatistics().getClear().error(start); - throw (IOException) e.fillInStackTrace(); - - } finally { - rwlock.writeLock().unlock(); - } - } - - @Override - public void destroy() throws IOException { - if (logger.isDebugEnabled()) { - logger.debug("{}Destroying soplog set", this.logPrefix); - } - - long start = factory.getConfiguration().getStatistics().getDestroy().begin(); - try { - unsetCurrent(); - clear(); - close(); - - factory.getConfiguration().getStatistics().getDestroy().end(start); - - } catch (IOException e) { - factory.getConfiguration().getStatistics().getDestroy().error(start); - throw (IOException) e.fillInStackTrace(); - } - } - - @Override - public void close() throws IOException { - if (logger.isDebugEnabled()) { - logger.debug("{}Closing soplog set", this.logPrefix); - } - - unsetCurrent(); - releaseTestDelay(); - - flusher.waitForCompletion(); - compactor.close(); - } - - @Override - public SerializedComparator getComparator() { - return factory.getConfiguration().getComparator(); - } - - @Override - public SortedStatistics getStatistics() throws IOException { - List<SortedStatistics> stats = new ArrayList<SortedStatistics>(); - Collection<TrackedReference<SortedOplogReader>> soplogs; - - // snapshot, this is expensive - rwlock.readLock().lock(); - try { - stats.add(getCurrent().getStatistics()); - for (SortedBuffer<Integer> sb : unflushed) { - stats.add(sb.getStatistics()); - } - soplogs = compactor.getActiveReaders(null, null); - } finally { - rwlock.readLock().unlock(); - } - - for (TrackedReference<SortedOplogReader> tr : soplogs) { - stats.add(tr.get().getStatistics()); - } - return new MergedStatistics(stats, soplogs); - } - - @Override - public Compactor getCompactor() { - return compactor; - } - - @Override - public boolean isClosed() { - return current.get() == null; - } - - @Override - public SortedOplogFactory getFactory() { - return factory; - } - - private SortedBuffer<Integer> flipBuffer() { - final SortedBuffer<Integer> sb; - sb = getCurrent(); - SortedBuffer<Integer> next = new SortedBuffer<Integer>( - factory.getConfiguration(), - bufferCount.incrementAndGet()); - - current.set(next); - if (logger.isDebugEnabled()) { - logger.debug("{}Switching from buffer {} to {}", this.logPrefix, sb, next); - } - return sb; - } - - private SortedBuffer<Integer> getCurrent() { - SortedBuffer<Integer> tmp = current.get(); - if (tmp == null) { - throw new IllegalStateException("Closed"); - } - return tmp; - } - - private void unsetCurrent() { - rwlock.writeLock().lock(); - try { - SortedBuffer<Integer> tmp = current.getAndSet(null); - if (tmp != null) { - tmp.clear(); - } - } finally { - rwlock.writeLock().unlock(); - } - } - - private void releaseTestDelay() { - if (testDelayDuringFlush != null) { - if (logger.isDebugEnabled()) { - logger.debug("{}Releasing testDelayDuringFlush", this.logPrefix); - } - testDelayDuringFlush.countDown(); - } - } - - private class FlushTask implements AbortableTask { - private final FlushHandler handler; - private final SortedBuffer<Integer> buffer; - private final long start; - - public FlushTask(FlushHandler handler, SortedBuffer<Integer> buffer, long start) { - this.handler = handler; - this.buffer = buffer; - this.start = start; - } - - @Override - public void runOrAbort(final AtomicBoolean aborted) { - try { - // First transfer the contents of the buffer to a new soplog. - final SortedOplog soplog = writeBuffer(buffer, aborted); - - // If we are aborted, someone else will cleanup the unflushed queue - if (soplog == null || !lockOrAbort(aborted)) { - handler.complete(); - return; - } - - try { - Runnable action = new Runnable() { - @Override - public void run() { - try { - compactor.add(soplog); - compactor.compact(false, null); - - unflushed.removeFirstOccurrence(buffer); - - // TODO need to invoke this while NOT holding write lock - handler.complete(); - factory.getConfiguration().getStatistics().getFlush().end(buffer.dataSize(), start); - - } catch (Exception e) { - handleError(e, aborted); - return; - } - } - }; - - // Enforce flush ordering for consistency. If the previous buffer flush - // is incomplete, we defer completion and release the thread to avoid - // deadlocks. - if (buffer == unflushed.peekLast()) { - action.run(); - - SortedBuffer<Integer> tail = unflushed.peekLast(); - while (tail != null && tail.isDeferred() && !aborted.get()) { - // TODO need to invoke this while NOT holding write lock - tail.complete(); - tail = unflushed.peekLast(); - } - } else { - buffer.defer(action); - } - } finally { - rwlock.writeLock().unlock(); - } - } catch (Exception e) { - handleError(e, aborted); - } - } - - @Override - public void abortBeforeRun() { - handler.complete(); - factory.getConfiguration().getStatistics().getFlush().end(start); - } - - private void handleError(Exception e, AtomicBoolean aborted) { - if (lockOrAbort(aborted)) { - try { - unflushed.removeFirstOccurrence(buffer); - } finally { - rwlock.writeLock().unlock(); - } - } - - handler.error(e); - factory.getConfiguration().getStatistics().getFlush().error(start); - } - - private SortedOplog writeBuffer(SortedBuffer<Integer> sb, AtomicBoolean aborted) - throws IOException { - File f = compactor.getFileset().getNextFilename(); - if (logger.isDebugEnabled()) { - logger.debug("{}Flushing buffer {} to {}", SortedOplogSetImpl.this.logPrefix, sb, f); - } - - SortedOplog so = factory.createSortedOplog(f); - SortedOplogWriter writer = so.createWriter(); - try { - if (testErrorDuringFlush) { - throw new IOException("Flush error due to testErrorDuringFlush=true"); - } - - for (Entry<byte[], byte[]> entry : sb.entries()) { - if (aborted.get()) { - writer.closeAndDelete(); - return null; - } - writer.append(entry.getKey(), entry.getValue()); - } - - checkTestDelay(); - - writer.close(buffer.getMetadata()); - return so; - - } catch (IOException e) { - if (logger.isDebugEnabled()) { - logger.debug("{}Encountered error while flushing buffer {}", SortedOplogSetImpl.this.logPrefix, sb, e); - } - - writer.closeAndDelete(); - throw e; - } - } - - private void checkTestDelay() { - if (testDelayDuringFlush != null) { - try { - if (logger.isDebugEnabled()) { - logger.debug("{}Waiting for testDelayDuringFlush", SortedOplogSetImpl.this.logPrefix); - } - testDelayDuringFlush.await(); - - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - } - - private boolean lockOrAbort(AtomicBoolean abort) { - try { - while (!abort.get()) { - if (rwlock.writeLock().tryLock(10, TimeUnit.MILLISECONDS)) { - return true; - } - } - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - return false; - } - } - - private class MergedStatistics implements SortedStatistics { - private final List<SortedStatistics> stats; - private final Collection<TrackedReference<SortedOplogReader>> soplogs; - - public MergedStatistics(List<SortedStatistics> stats, Collection<TrackedReference<SortedOplogReader>> soplogs) { - this.stats = stats; - this.soplogs = soplogs; - } - - @Override - public long keyCount() { - // TODO we have no way of determining the overall key population - // just assume no overlap for now - long keys = 0; - for (SortedStatistics ss : stats) { - keys += ss.keyCount(); - } - return keys; - } - - @Override - public byte[] firstKey() { - byte[] first = stats.get(0).firstKey(); - for (int i = 1; i < stats.size(); i++) { - byte[] tmp = stats.get(i).firstKey(); - if (getComparator().compare(first, tmp) > 0) { - first = tmp; - } - } - return first; - } - - @Override - public byte[] lastKey() { - byte[] last = stats.get(0).lastKey(); - for (int i = 1; i < stats.size(); i++) { - byte[] tmp = stats.get(i).lastKey(); - if (getComparator().compare(last, tmp) < 0) { - last = tmp; - } - } - return last; - } - - @Override - public double avgKeySize() { - double avg = 0; - for (SortedStatistics ss : stats) { - avg += ss.avgKeySize(); - } - return avg / stats.size(); - } - - @Override - public double avgValueSize() { - double avg = 0; - for (SortedStatistics ss : stats) { - avg += ss.avgValueSize(); - } - return avg / stats.size(); - } - - @Override - public void close() { - TrackedReference.decrementAll(soplogs); - } - } - - /** - * Provides ordered iteration across a set of sorted data sets. - */ - public static class MergedIterator - extends AbstractKeyValueIterator<ByteBuffer, ByteBuffer> - implements SortedIterator<ByteBuffer> - { - /** the comparison operator */ - private final SerializedComparator comparator; - - /** the reference counted soplogs */ - private final Collection<TrackedReference<SortedOplogReader>> soplogs; - - /** the backing iterators */ - private final List<SortedIterator<ByteBuffer>> iters; - - /** the current key */ - private ByteBuffer key; - - /** the current value */ - private ByteBuffer value; - - public MergedIterator(SerializedComparator comparator, - Collection<TrackedReference<SortedOplogReader>> soplogs, - List<SortedIterator<ByteBuffer>> iters) { - this.comparator = comparator; - this.soplogs = soplogs; - this.iters = iters; - - // initialize iteration positions - int i = 0; - while (i < iters.size()) { - i = advance(i); - } - } - - @Override - public ByteBuffer key() { - return key; - } - - @Override - public ByteBuffer value() { - return value; - } - - @Override - protected boolean step() { - if (iters.isEmpty() || readerIsClosed()) { - return false; - } - - int cursor = 0; - key = iters.get(cursor).key(); - - int i = 1; - while (i < iters.size()) { - ByteBuffer tmp = iters.get(i).key(); - - int diff = comparator.compare(tmp.array(), tmp.arrayOffset(), tmp.remaining(), - key.array(), key.arrayOffset(), key.remaining()); - if (diff < 0) { - cursor = i++; - key = tmp; - - } else if (diff == 0) { - i = advance(i); - - } else { - i++; - } - } - - value = iters.get(cursor).value(); - advance(cursor); - - return true; - } - - @Override - public void close() { - for (SortedIterator<ByteBuffer> iter : iters) { - iter.close(); - } - TrackedReference.decrementAll(soplogs); - } - - private int advance(int idx) { - // either advance the cursor or remove the iterator - if (!iters.get(idx).hasNext()) { - iters.remove(idx).close(); - return idx; - } - iters.get(idx).next(); - return idx + 1; - } - - private boolean readerIsClosed() { - for (TrackedReference<SortedOplogReader> tr : soplogs) { - if (tr.get().isClosed()) { - return true; - } - } - return false; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9438c8b1/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/hfile/BlockCacheHolder.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/hfile/BlockCacheHolder.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/hfile/BlockCacheHolder.java deleted file mode 100644 index eb5154c..0000000 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/hfile/BlockCacheHolder.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.internal.cache.persistence.soplog.hfile; - -import org.apache.hadoop.hbase.io.hfile.BlockCache; - -import com.gemstone.gemfire.internal.cache.persistence.soplog.HFileStoreStatistics; - -public class BlockCacheHolder { - private BlockCache cache; - private HFileStoreStatistics stats; - - public BlockCacheHolder(HFileStoreStatistics stats, BlockCache cache) { - this.stats = stats; - this.cache = cache; - } - - public synchronized BlockCache getBlockCache() { - return cache; - } - - public synchronized HFileStoreStatistics getHFileStoreStats() { - return stats; - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9438c8b1/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/hfile/HFileSortedOplog.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/hfile/HFileSortedOplog.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/hfile/HFileSortedOplog.java deleted file mode 100644 index 56c6960..0000000 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/hfile/HFileSortedOplog.java +++ /dev/null @@ -1,694 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.internal.cache.persistence.soplog.hfile; - -import java.io.ByteArrayInputStream; -import java.io.DataInput; -import java.io.DataInputStream; -import java.io.File; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.Arrays; -import java.util.Collections; -import java.util.EnumMap; -import java.util.Map.Entry; -import java.util.NoSuchElementException; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; - -import com.gemstone.gemfire.cache.hdfs.HDFSIOException; -import com.gemstone.gemfire.internal.cache.persistence.soplog.AbstractSortedReader; -import com.gemstone.gemfire.internal.cache.persistence.soplog.DelegatingSerializedComparator; -import com.gemstone.gemfire.internal.cache.persistence.soplog.ReversingSerializedComparator; -import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedBuffer.BufferIterator; -import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplog; -import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplogFactory.SortedOplogConfiguration; -import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedReader.Metadata; -import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedReader.SerializedComparator; -import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedReader.SortedIterator; -import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedReader.SortedStatistics; -import com.gemstone.gemfire.internal.i18n.LocalizedStrings; -import com.gemstone.gemfire.internal.util.Bytes; -import com.gemstone.gemfire.internal.util.Hex; -import org.apache.hadoop.hbase.io.hfile.CacheConfig; -import org.apache.hadoop.hbase.io.hfile.HFile; -import org.apache.hadoop.hbase.io.hfile.HFile.Reader; -import org.apache.hadoop.hbase.io.hfile.HFile.Writer; -import org.apache.hadoop.hbase.io.hfile.HFileScanner; -import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType; -import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics; -import org.apache.hadoop.hbase.util.BloomFilterFactory; -import org.apache.hadoop.hbase.util.BloomFilterWriter; -import org.apache.hadoop.hbase.util.FSUtils; -import org.apache.logging.log4j.Logger; -import com.gemstone.gemfire.internal.logging.LogService; - -/** - * Provides a soplog backed by an HFile. - * - * @author bakera - */ -public class HFileSortedOplog implements SortedOplog { - public static final byte[] MAGIC = new byte[] { 0x53, 0x4F, 0x50 }; - public static final byte[] VERSION_1 = new byte[] { 0x1 }; - - // FileInfo is not visible - private static final byte[] AVG_KEY_LEN = "hfile.AVG_KEY_LEN".getBytes(); - private static final byte[] AVG_VALUE_LEN = "hfile.AVG_VALUE_LEN".getBytes(); - - /** a default bloom filter */ - private static final BloomFilter DUMMY_BLOOM = new BloomFilter() { - @Override - public boolean mightContain(byte[] key) { - return true; - } - }; - - static final Configuration hconf; - private static final FileSystem fs; - - static { - // Leave these HBase properties set to defaults for now - // - // hfile.block.cache.size (25% of heap) - // hbase.hash.type (murmur) - // hfile.block.index.cacheonwrite (false) - // hfile.index.block.max.size (128k) - // hfile.format.version (2) - // io.storefile.bloom.block.size (128k) - // hfile.block.bloom.cacheonwrite (false) - // hbase.rs.cacheblocksonwrite (false) - // hbase.offheapcache.minblocksize (64k) - // hbase.offheapcache.percentage (0) - hconf = new Configuration(); - - hconf.setBoolean("hbase.metrics.showTableName", true); - SchemaMetrics.configureGlobally(hconf); - - try { - fs = FileSystem.get(hconf); - } catch (IOException e) { - throw new IllegalStateException(e); - } - } - - private static enum InternalMetadata { - /** identifies the soplog as a gemfire file, required */ - GEMFIRE_MAGIC, - - /** identifies the soplog version, required */ - VERSION, - - /** identifies the statistics data */ - STATISTICS, - - /** identifies the names of embedded comparators */ - COMPARATORS; - - public byte[] bytes() { - return ("gemfire." + name()).getBytes(); - } - } - - //logger instance - private static final Logger logger = LogService.getLogger(); - protected final String logPrefix; - - /** the configuration */ - private final SortedOplogConfiguration sopConfig; - - /** the hfile cache config */ - private final CacheConfig hcache; - - /** the hfile location */ - private Path path; - - public HFileSortedOplog(File hfile, SortedOplogConfiguration sopConfig) throws IOException { - assert hfile != null; - assert sopConfig != null; - - this.sopConfig = sopConfig; - path = fs.makeQualified(new Path(hfile.toString())); - -// hcache = new CacheConfig(hconf, sopConfig.getCacheDataBlocksOnRead(), sopConfig.getBlockCache(), -// HFileSortedOplogFactory.convertStatistics(sopConfig.getStatistics(), sopConfig.getStoreStatistics())); - hcache = new CacheConfig(hconf); - this.logPrefix = "<" + sopConfig.getName() + "> "; - } - - @Override - public SortedOplogReader createReader() throws IOException { - if (logger.isDebugEnabled()) { - logger.debug("{}Creating an HFile reader on " + path, logPrefix); - } - - return new HFileSortedOplogReader(); - } - - @Override - public SortedOplogWriter createWriter() throws IOException { - if (logger.isDebugEnabled()) { - logger.debug("{}Creating an HFile writer on " + path, logPrefix); - } - - return new HFileSortedOplogWriter(); - } - - SortedOplogConfiguration getConfiguration() { - return sopConfig; - } - - private class HFileSortedOplogReader extends AbstractSortedReader implements SortedOplogReader { - private final Reader reader; - private final BloomFilter bloom; - private final SortedStatistics stats; - private volatile boolean closed; - - public HFileSortedOplogReader() throws IOException { - reader = HFile.createReader(fs, path, hcache); - validate(); - - stats = new HFileSortedStatistics(reader); - closed = false; - - if (reader.getComparator() instanceof DelegatingSerializedComparator) { - loadComparators((DelegatingSerializedComparator) reader.getComparator()); - } - - DataInput bin = reader.getGeneralBloomFilterMetadata(); - if (bin != null) { - final org.apache.hadoop.hbase.util.BloomFilter hbloom = BloomFilterFactory.createFromMeta(bin, reader); - if (reader.getComparator() instanceof DelegatingSerializedComparator) { - loadComparators((DelegatingSerializedComparator) hbloom.getComparator()); - } - - bloom = new BloomFilter() { - @Override - public boolean mightContain(byte[] key) { - assert key != null; - - long start = sopConfig.getStatistics().getBloom().begin(); - boolean foundKey = hbloom.contains(key, 0, key.length, null); - sopConfig.getStatistics().getBloom().end(start); - - if (logger.isTraceEnabled()) { - logger.trace(String.format("{}Bloom check on %s for key %s: %b", - path, Hex.toHex(key), foundKey), logPrefix); - } - return foundKey; - } - }; - - } else { - bloom = DUMMY_BLOOM; - } - } - - @Override - public boolean mightContain(byte[] key) { - return getBloomFilter().mightContain(key); - } - - @Override - public ByteBuffer read(byte[] key) throws IOException { - assert key != null; - - if (logger.isTraceEnabled()) { - logger.trace(String.format("{}Reading key %s from %s", Hex.toHex(key), path), logPrefix); - } - - long start = sopConfig.getStatistics().getRead().begin(); - try { - HFileScanner seek = reader.getScanner(true, true); - if (seek.seekTo(key) == 0) { - ByteBuffer val = seek.getValue(); - sopConfig.getStatistics().getRead().end(val.remaining(), start); - - return val; - } - - sopConfig.getStatistics().getRead().end(start); - sopConfig.getStatistics().getBloom().falsePositive(); - return null; - - } catch (IOException e) { - sopConfig.getStatistics().getRead().error(start); - throw (IOException) e.fillInStackTrace(); - } - } - - @Override - public SortedIterator<ByteBuffer> scan( - byte[] from, boolean fromInclusive, - byte[] to, boolean toInclusive, - boolean ascending, - MetadataFilter filter) throws IOException { - if (filter == null || filter.accept(getMetadata(filter.getName()))) { - SerializedComparator tmp = (SerializedComparator) reader.getComparator(); - tmp = ascending ? tmp : ReversingSerializedComparator.reverse(tmp); - -// HFileScanner scan = reader.getScanner(true, false, ascending, false); - HFileScanner scan = reader.getScanner(true, false, false); - return new HFileSortedIterator(scan, tmp, from, fromInclusive, to, toInclusive); - } - return new BufferIterator(Collections.<byte[], byte[]>emptyMap().entrySet().iterator()); - } - - @Override - public SerializedComparator getComparator() { - return (SerializedComparator) reader.getComparator(); - } - - @Override - public SortedStatistics getStatistics() { - return stats; - } - - @Override - public boolean isClosed() { - return closed; - } - - @Override - public void close() throws IOException { - if (logger.isDebugEnabled()) { - logger.debug("{}Closing reader on " + path, logPrefix); - } - reader.close(); - closed = true; - } - - @Override - public BloomFilter getBloomFilter() { - return bloom; - } - - @Override - public byte[] getMetadata(Metadata name) throws IOException { - assert name != null; - - return reader.loadFileInfo().get(name.bytes()); - } - - @Override - public File getFile() { - return new File(path.toUri()); - } - - @Override - public String getFileName() { - return path.getName(); - } - - @Override - public long getModificationTimeStamp() throws IOException { - FileStatus[] stats = FSUtils.listStatus(fs, path, null); - if (stats != null && stats.length == 1) { - return stats[0].getModificationTime(); - } else { - return 0; - } - } - - @Override - public void rename(String name) throws IOException { - Path parent = path.getParent(); - Path newPath = new Path(parent, name); - fs.rename(path, newPath); - // update path to point to the new path - path = newPath; - } - - @Override - public void delete() throws IOException { - fs.delete(path, false); - } - - @Override - public String toString() { - return path.toString(); - } - - private byte[] getMetadata(InternalMetadata name) throws IOException { - return reader.loadFileInfo().get(name.bytes()); - } - - private void validate() throws IOException { - // check magic - byte[] magic = getMetadata(InternalMetadata.GEMFIRE_MAGIC); - if (!Arrays.equals(magic, MAGIC)) { - throw new IOException(LocalizedStrings.Soplog_INVALID_MAGIC.toLocalizedString(Hex.toHex(magic))); - } - - // check version compatibility - byte[] ver = getMetadata(InternalMetadata.VERSION); - if (logger.isDebugEnabled()) { - logger.debug("{}Soplog version is " + Hex.toHex(ver), logPrefix); - } - - if (!Arrays.equals(ver, VERSION_1)) { - throw new IOException(LocalizedStrings.Soplog_UNRECOGNIZED_VERSION.toLocalizedString(Hex.toHex(ver))); - } - } - - private void loadComparators(DelegatingSerializedComparator comparator) throws IOException { - byte[] raw = reader.loadFileInfo().get(InternalMetadata.COMPARATORS.bytes()); - assert raw != null; - - DataInput in = new DataInputStream(new ByteArrayInputStream(raw)); - comparator.setComparators(readComparators(in)); - } - - private SerializedComparator[] readComparators(DataInput in) throws IOException { - try { - SerializedComparator[] comps = new SerializedComparator[in.readInt()]; - assert comps.length > 0; - - for (int i = 0; i < comps.length; i++) { - comps[i] = (SerializedComparator) Class.forName(in.readUTF()).newInstance(); - if (comps[i] instanceof DelegatingSerializedComparator) { - ((DelegatingSerializedComparator) comps[i]).setComparators(readComparators(in)); - } - } - return comps; - - } catch (Exception e) { - throw new IOException(e); - } - } - } - - private class HFileSortedOplogWriter implements SortedOplogWriter { - private final Writer writer; - private final BloomFilterWriter bfw; - - public HFileSortedOplogWriter() throws IOException { - writer = HFile.getWriterFactory(hconf, hcache) - .withPath(fs, path) - .withBlockSize(sopConfig.getBlockSize()) - .withBytesPerChecksum(sopConfig.getBytesPerChecksum()) - .withChecksumType(HFileSortedOplogFactory.convertChecksum(sopConfig.getChecksum())) -// .withComparator(sopConfig.getComparator()) - .withCompression(HFileSortedOplogFactory.convertCompression(sopConfig.getCompression())) - .withDataBlockEncoder(HFileSortedOplogFactory.convertEncoding(sopConfig.getKeyEncoding())) - .create(); - - bfw = sopConfig.isBloomFilterEnabled() ? -// BloomFilterFactory.createGeneralBloomAtWrite(hconf, hcache, BloomType.ROW, -// 0, writer, sopConfig.getComparator()) - BloomFilterFactory.createGeneralBloomAtWrite(hconf, hcache, BloomType.ROW, - 0, writer) - : null; - } - - @Override - public void append(byte[] key, byte[] value) throws IOException { - assert key != null; - assert value != null; - - if (logger.isTraceEnabled()) { - logger.trace(String.format("{}Appending key %s to %s", Hex.toHex(key), path), logPrefix); - } - - try { - writer.append(key, value); - if (bfw != null) { - bfw.add(key, 0, key.length); - } - } catch (IOException e) { - throw (IOException) e.fillInStackTrace(); - } - } - - @Override - public void append(ByteBuffer key, ByteBuffer value) throws IOException { - assert key != null; - assert value != null; - - if (logger.isTraceEnabled()) { - logger.trace(String.format("{}Appending key %s to %s", - Hex.toHex(key.array(), key.arrayOffset(), key.remaining()), path), logPrefix); - } - - try { - byte[] keyBytes = new byte[key.remaining()]; - key.duplicate().get(keyBytes); - byte[] valueBytes = new byte[value.remaining()]; - value.duplicate().get(valueBytes); - writer.append(keyBytes, valueBytes); - if (bfw != null) { - bfw.add(key.array(), key.arrayOffset(), key.remaining()); - } - } catch (IOException e) { - throw (IOException) e.fillInStackTrace(); - } - } - - @Override - public void close(EnumMap<Metadata, byte[]> metadata) throws IOException { - if (logger.isTraceEnabled()) { - logger.debug("{}Finalizing and closing writer on " + path, logPrefix); - } - - if (bfw != null) { - bfw.compactBloom(); - writer.addGeneralBloomFilter(bfw); - } - - // append system metadata - writer.appendFileInfo(InternalMetadata.GEMFIRE_MAGIC.bytes(), MAGIC); - writer.appendFileInfo(InternalMetadata.VERSION.bytes(), VERSION_1); - - // append comparator info -// if (writer.getComparator() instanceof DelegatingSerializedComparator) { -// ByteArrayOutputStream bos = new ByteArrayOutputStream(); -// DataOutput out = new DataOutputStream(bos); -// -// writeComparatorInfo(out, ((DelegatingSerializedComparator) writer.getComparator()).getComparators()); -// writer.appendFileInfo(InternalMetadata.COMPARATORS.bytes(), bos.toByteArray()); -// } - - // TODO write statistics data to soplog - // writer.appendFileInfo(Meta.STATISTICS.toBytes(), null); - - // append user metadata - if (metadata != null) { - for (Entry<Metadata, byte[]> entry : metadata.entrySet()) { - writer.appendFileInfo(entry.getKey().name().getBytes(), entry.getValue()); - } - } - - writer.close(); - } - - @Override - public void closeAndDelete() throws IOException { - if (logger.isTraceEnabled()) { - logger.debug("{}Closing writer and deleting " + path, logPrefix); - } - - writer.close(); - new File(writer.getPath().toUri()).delete(); - } - -// private void writeComparatorInfo(DataOutput out, SerializedComparator[] comparators) throws IOException { -// out.writeInt(comparators.length); -// for (SerializedComparator sc : comparators) { -// out.writeUTF(sc.getClass().getName()); -// if (sc instanceof DelegatingSerializedComparator) { -// writeComparatorInfo(out, ((DelegatingSerializedComparator) sc).getComparators()); -// } -// } -// } - } - - private class HFileSortedIterator implements SortedIterator<ByteBuffer> { - private final HFileScanner scan; - private final SerializedComparator comparator; - - private final byte[] from; - private final boolean fromInclusive; - - private final byte[] to; - private final boolean toInclusive; - - private final long start; - private long bytes; - - private boolean foundNext; - - private ByteBuffer key; - private ByteBuffer value; - - public HFileSortedIterator(HFileScanner scan, SerializedComparator comparator, - byte[] from, boolean fromInclusive, - byte[] to, boolean toInclusive) throws IOException { - this.scan = scan; - this.comparator = comparator; - this.from = from; - this.fromInclusive = fromInclusive; - this.to = to; - this.toInclusive = toInclusive; - - assert from == null - || to == null - || comparator.compare(from, 0, from.length, to, 0, to.length) <= 0; - - start = sopConfig.getStatistics().getScan().begin(); - foundNext = evalFrom(); - } - - @Override - public ByteBuffer key() { - return key; - } - - @Override - public ByteBuffer value() { - return value; - } - - @Override - public boolean hasNext() { - if (!foundNext) { - foundNext = step(); - } - return foundNext; - } - - @Override - public ByteBuffer next() { - long startNext = sopConfig.getStatistics().getScan().beginIteration(); - - if (!hasNext()) { - throw new NoSuchElementException(); - } - - foundNext = false; - key = scan.getKey(); - value = scan.getValue(); - - int len = key.remaining() + value.remaining(); - bytes += len; - sopConfig.getStatistics().getScan().endIteration(len, startNext); - - return key; - } - - @Override - public void remove() { - throw new UnsupportedOperationException(); - } - - @Override - public void close() { - sopConfig.getStatistics().getScan().end(bytes, start); - } - - private boolean step() { - try { - if (!scan.isSeeked()) { - return false; - - } else if (scan.next() && evalTo()) { - return true; - } - } catch (IOException e) { - throw new HDFSIOException("Error from HDFS during iteration", e); - } - return false; - } - - private boolean evalFrom() throws IOException { - if (from == null) { - return scan.seekTo() && evalTo(); - - } else { - int compare = scan.seekTo(from); - if (compare < 0) { - return scan.seekTo() && evalTo(); - - } else if (compare == 0 && fromInclusive) { - return true; - - } else { - return step(); - } - } - } - - private boolean evalTo() throws IOException { - int compare = -1; - if (to != null) { - ByteBuffer key = scan.getKey(); - compare = comparator.compare( - key.array(), key.arrayOffset(), key.remaining(), - to, 0, to.length); - } - - return compare < 0 || (compare == 0 && toInclusive); - } - } - - private static class HFileSortedStatistics implements SortedStatistics { - private final Reader reader; - private final int keySize; - private final int valueSize; - - public HFileSortedStatistics(Reader reader) throws IOException { - this.reader = reader; - - byte[] sz = reader.loadFileInfo().get(AVG_KEY_LEN); - keySize = Bytes.toInt(sz[0], sz[1], sz[2], sz[3]); - - sz = reader.loadFileInfo().get(AVG_VALUE_LEN); - valueSize = Bytes.toInt(sz[0], sz[1], sz[2], sz[3]); - } - - @Override - public long keyCount() { - return reader.getEntries(); - } - - @Override - public byte[] firstKey() { - return reader.getFirstKey(); - } - - @Override - public byte[] lastKey() { - return reader.getLastKey(); - } - - @Override - public double avgKeySize() { - return keySize; - } - - @Override - public double avgValueSize() { - return valueSize; - } - - @Override - public void close() { - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9438c8b1/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/hfile/HFileSortedOplogFactory.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/hfile/HFileSortedOplogFactory.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/hfile/HFileSortedOplogFactory.java deleted file mode 100644 index 9546fd3..0000000 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/persistence/soplog/hfile/HFileSortedOplogFactory.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.gemstone.gemfire.internal.cache.persistence.soplog.hfile; - -import java.io.File; -import java.io.IOException; - -import org.apache.hadoop.hbase.io.hfile.BlockCache; -import org.apache.hadoop.hbase.io.hfile.Compression.Algorithm; -import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder; -import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder; -import org.apache.hadoop.hbase.util.ChecksumType; - -import com.gemstone.gemfire.internal.cache.persistence.soplog.HFileStoreStatistics; -import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplog; -import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplogFactory; -import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplogFactory.SortedOplogConfiguration.Checksum; -import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplogFactory.SortedOplogConfiguration.Compression; -import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplogFactory.SortedOplogConfiguration.KeyEncoding; -import com.gemstone.gemfire.internal.cache.persistence.soplog.SortedOplogStatistics; - -/** - * Creates HFile soplogs. - * - * @author bakera - */ -public class HFileSortedOplogFactory implements SortedOplogFactory { - private final SortedOplogConfiguration config; - - public HFileSortedOplogFactory(String name, BlockCache blockCache, SortedOplogStatistics stats, HFileStoreStatistics storeStats) { - config = new SortedOplogConfiguration(name, blockCache, stats, storeStats); - } - - @Override - public SortedOplogConfiguration getConfiguration() { - return config; - } - - @Override - public SortedOplog createSortedOplog(File name) throws IOException { - return new HFileSortedOplog(name, config); - } - - public static ChecksumType convertChecksum(Checksum type) { - switch (type) { - case NONE: return ChecksumType.NULL; - - default: - case CRC32: return ChecksumType.CRC32; - } - } - - public static Algorithm convertCompression(Compression type) { - switch (type) { - default: - case NONE: return Algorithm.NONE; - } - } - - public static HFileDataBlockEncoder convertEncoding(KeyEncoding type) { - switch (type) { - default: - case NONE: return NoOpDataBlockEncoder.INSTANCE; - } - } -}
