http://git-wip-us.apache.org/repos/asf/cassandra/blob/b31845c4/src/java/org/apache/cassandra/db/ColumnFamilyStore.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index c4377d6..d77cf1f 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -34,19 +34,13 @@ import com.google.common.base.*; import com.google.common.base.Throwables; import com.google.common.collect.*; import com.google.common.util.concurrent.*; - -import org.apache.cassandra.db.lifecycle.*; -import org.apache.cassandra.io.FSWriteError; -import org.apache.cassandra.metrics.TableMetrics; -import org.json.simple.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.clearspring.analytics.stream.Counter; import org.apache.cassandra.cache.*; import org.apache.cassandra.concurrent.*; import org.apache.cassandra.config.*; -import org.apache.cassandra.config.CFMetaData.SpeculativeRetry; -import org.apache.cassandra.db.rows.*; import org.apache.cassandra.db.commitlog.CommitLog; import org.apache.cassandra.db.commitlog.ReplayPosition; import org.apache.cassandra.db.compaction.*; @@ -54,24 +48,28 @@ import org.apache.cassandra.db.filter.*; import org.apache.cassandra.db.index.SecondaryIndex; import org.apache.cassandra.db.index.SecondaryIndexManager; import org.apache.cassandra.db.view.MaterializedViewManager; +import org.apache.cassandra.db.lifecycle.*; import org.apache.cassandra.db.partitions.*; +import org.apache.cassandra.db.rows.*; import org.apache.cassandra.dht.*; import org.apache.cassandra.dht.Range; import org.apache.cassandra.exceptions.ConfigurationException; -import org.apache.cassandra.io.compress.CompressionParameters; -import org.apache.cassandra.io.sstable.Descriptor; +import org.apache.cassandra.io.FSWriteError; import org.apache.cassandra.io.sstable.*; +import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.format.*; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.metrics.TableMetrics.Sampler; +import org.apache.cassandra.metrics.TableMetrics; +import org.apache.cassandra.schema.*; import org.apache.cassandra.service.CacheService; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.utils.*; -import org.apache.cassandra.utils.concurrent.*; import org.apache.cassandra.utils.TopKSampler.SamplerResult; +import org.apache.cassandra.utils.concurrent.*; import org.apache.cassandra.utils.memory.MemtableAllocator; +import org.json.simple.*; -import com.clearspring.analytics.stream.Counter; import static org.apache.cassandra.utils.Throwables.maybeFail; @@ -185,10 +183,10 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean // only update these runtime-modifiable settings if they have not been modified. if (!minCompactionThreshold.isModified()) for (ColumnFamilyStore cfs : concatWithIndexes()) - cfs.minCompactionThreshold = new DefaultInteger(metadata.getMinCompactionThreshold()); + cfs.minCompactionThreshold = new DefaultInteger(metadata.params.compaction.minCompactionThreshold()); if (!maxCompactionThreshold.isModified()) for (ColumnFamilyStore cfs : concatWithIndexes()) - cfs.maxCompactionThreshold = new DefaultInteger(metadata.getMaxCompactionThreshold()); + cfs.maxCompactionThreshold = new DefaultInteger(metadata.params.compaction.maxCompactionThreshold()); compactionStrategyManager.maybeReload(metadata); @@ -205,7 +203,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean void scheduleFlush() { - int period = metadata.getMemtableFlushPeriod(); + int period = metadata.params.memtableFlushPeriodInMs; if (period > 0) { logger.debug("scheduling flush in {} ms", period); @@ -252,33 +250,25 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean public void setCompactionStrategyClass(String compactionStrategyClass) { - try - { - metadata.compactionStrategyClass = CFMetaData.createCompactionStrategy(compactionStrategyClass); - compactionStrategyManager.maybeReload(metadata); - } - catch (ConfigurationException e) - { - throw new IllegalArgumentException(e.getMessage()); - } + throw new UnsupportedOperationException("ColumnFamilyStore.setCompactionStrategyClass() method is no longer supported"); } public String getCompactionStrategyClass() { - return metadata.compactionStrategyClass.getName(); + return metadata.params.compaction.klass().getName(); } public Map<String,String> getCompressionParameters() { - return metadata.compressionParameters().asMap(); + return metadata.params.compression.asMap(); } public void setCompressionParameters(Map<String,String> opts) { try { - metadata.compressionParameters = CompressionParameters.fromMap(opts); - metadata.compressionParameters.validate(); + metadata.compression(CompressionParams.fromMap(opts)); + metadata.params.compression.validate(); } catch (ConfigurationException e) { @@ -326,8 +316,8 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean this.keyspace = keyspace; name = columnFamilyName; this.metadata = metadata; - this.minCompactionThreshold = new DefaultInteger(metadata.getMinCompactionThreshold()); - this.maxCompactionThreshold = new DefaultInteger(metadata.getMaxCompactionThreshold()); + this.minCompactionThreshold = new DefaultInteger(metadata.params.compaction.minCompactionThreshold()); + this.maxCompactionThreshold = new DefaultInteger(metadata.params.compaction.maxCompactionThreshold()); this.directories = directories; this.indexManager = new SecondaryIndexManager(this); this.materializedViewManager = new MaterializedViewManager(this); @@ -335,7 +325,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean fileIndexGenerator.set(generation); sampleLatencyNanos = DatabaseDescriptor.getReadRpcTimeout() / 2; - CachingOptions caching = metadata.getCaching(); + CachingParams caching = metadata.params.caching; logger.info("Initializing {}.{}", keyspace.getName(), name); @@ -349,7 +339,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean data.addInitialSSTables(sstables); } - if (caching.keyCache.isEnabled()) + if (caching.cacheKeys()) CacheService.instance.keyCache.loadSaved(this); // compaction strategy should be created after the CFS has been prepared @@ -390,21 +380,20 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean { throw new RuntimeException(e); } - logger.debug("retryPolicy for {} is {}", name, this.metadata.getSpeculativeRetry()); + logger.debug("retryPolicy for {} is {}", name, this.metadata.params.speculativeRetry); latencyCalculator = ScheduledExecutors.optionalTasks.scheduleWithFixedDelay(new Runnable() { public void run() { - SpeculativeRetry retryPolicy = ColumnFamilyStore.this.metadata.getSpeculativeRetry(); - switch (retryPolicy.type) + SpeculativeRetryParam retryPolicy = ColumnFamilyStore.this.metadata.params.speculativeRetry; + switch (retryPolicy.kind()) { case PERCENTILE: // get percentile in nanos - sampleLatencyNanos = (long) (metric.coordinatorReadLatency.getSnapshot().getValue(retryPolicy.value) * 1000d); + sampleLatencyNanos = (long) (metric.coordinatorReadLatency.getSnapshot().getValue(retryPolicy.threshold()) * 1000d); break; case CUSTOM: - // convert to nanos, since configuration is in millisecond - sampleLatencyNanos = (long) (retryPolicy.value * 1000d * 1000d); + sampleLatencyNanos = (long) retryPolicy.threshold(); break; default: sampleLatencyNanos = Long.MAX_VALUE; @@ -1386,7 +1375,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean // what we're caching. Wen doing that, we should be careful about expiring cells: we should count // something expired that wasn't when the partition was cached, or we could decide that the whole // partition is cached when it's not. This is why we use CachedPartition#cachedLiveRows. - if (cached.cachedLiveRows() < metadata.getCaching().rowCache.rowsToCache) + if (cached.cachedLiveRows() < metadata.params.caching.rowsPerPartitionToCache()) return true; // If the whole partition isn't cached, then we must guarantee that the filter cannot select data that @@ -1398,7 +1387,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean public int gcBefore(int nowInSec) { - return nowInSec - metadata.getGcGraceSeconds(); + return nowInSec - metadata.params.gcGraceSeconds; } @SuppressWarnings("resource") @@ -2153,7 +2142,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean public boolean isRowCacheEnabled() { - return metadata.getCaching().rowCache.isEnabled() && CacheService.instance.rowCache.getCapacity() > 0; + return metadata.params.caching.cacheRows() && CacheService.instance.rowCache.getCapacity() > 0; } /** @@ -2193,7 +2182,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean for (SSTableReader sstable : getSSTables(SSTableSet.LIVE)) { - allDroppable += sstable.getDroppableTombstonesBefore(localTime - sstable.metadata.getGcGraceSeconds()); + allDroppable += sstable.getDroppableTombstonesBefore(localTime - sstable.metadata.params.gcGraceSeconds); allColumns += sstable.getEstimatedColumnCount().mean() * sstable.getEstimatedColumnCount().count(); } return allColumns > 0 ? allDroppable / allColumns : 0;
http://git-wip-us.apache.org/repos/asf/cassandra/blob/b31845c4/src/java/org/apache/cassandra/db/HintedHandOffManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/HintedHandOffManager.java b/src/java/org/apache/cassandra/db/HintedHandOffManager.java index 73189a6..17832d7 100644 --- a/src/java/org/apache/cassandra/db/HintedHandOffManager.java +++ b/src/java/org/apache/cassandra/db/HintedHandOffManager.java @@ -147,7 +147,7 @@ public class HintedHandOffManager implements HintedHandOffManagerMBean { int ttl = maxHintTTL; for (PartitionUpdate upd : mutation.getPartitionUpdates()) - ttl = Math.min(ttl, upd.metadata().getGcGraceSeconds()); + ttl = Math.min(ttl, upd.metadata().params.gcGraceSeconds); return ttl; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/b31845c4/src/java/org/apache/cassandra/db/LivenessInfo.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/LivenessInfo.java b/src/java/org/apache/cassandra/db/LivenessInfo.java index 8f7b1c2..3c87030 100644 --- a/src/java/org/apache/cassandra/db/LivenessInfo.java +++ b/src/java/org/apache/cassandra/db/LivenessInfo.java @@ -48,7 +48,7 @@ public class LivenessInfo public static LivenessInfo create(CFMetaData metadata, long timestamp, int nowInSec) { - int defaultTTL = metadata.getDefaultTimeToLive(); + int defaultTTL = metadata.params.defaultTimeToLive; if (defaultTTL != NO_TTL) return expiring(timestamp, defaultTTL, nowInSec); http://git-wip-us.apache.org/repos/asf/cassandra/blob/b31845c4/src/java/org/apache/cassandra/db/Memtable.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/Memtable.java b/src/java/org/apache/cassandra/db/Memtable.java index 5ec9fe5..a950e17 100644 --- a/src/java/org/apache/cassandra/db/Memtable.java +++ b/src/java/org/apache/cassandra/db/Memtable.java @@ -199,7 +199,7 @@ public class Memtable implements Comparable<Memtable> */ public boolean isExpired() { - int period = cfs.metadata.getMemtableFlushPeriod(); + int period = cfs.metadata.params.memtableFlushPeriodInMs; return period > 0 && (System.nanoTime() - creationNano >= TimeUnit.MILLISECONDS.toNanos(period)); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/b31845c4/src/java/org/apache/cassandra/db/RowUpdateBuilder.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/RowUpdateBuilder.java b/src/java/org/apache/cassandra/db/RowUpdateBuilder.java index e4f05b0..372ba04 100644 --- a/src/java/org/apache/cassandra/db/RowUpdateBuilder.java +++ b/src/java/org/apache/cassandra/db/RowUpdateBuilder.java @@ -123,7 +123,7 @@ public class RowUpdateBuilder public RowUpdateBuilder(CFMetaData metadata, int localDeletionTime, long timestamp, Object partitionKey) { - this(metadata, localDeletionTime, timestamp, metadata.getDefaultTimeToLive(), partitionKey); + this(metadata, localDeletionTime, timestamp, metadata.params.defaultTimeToLive, partitionKey); } public RowUpdateBuilder(CFMetaData metadata, long timestamp, int ttl, Object partitionKey) http://git-wip-us.apache.org/repos/asf/cassandra/blob/b31845c4/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java index 6e9e2d5..bb184e8 100644 --- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java +++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java @@ -292,7 +292,7 @@ public abstract class SinglePartitionReadCommand<F extends ClusteringIndexFilter cfs.metric.rowCacheMiss.inc(); Tracing.trace("Row cache miss"); - boolean cacheFullPartitions = metadata().getCaching().rowCache.cacheFullPartitions(); + boolean cacheFullPartitions = metadata().params.caching.cacheAllRows(); // To be able to cache what we read, what we read must at least covers what the cache holds, that // is the 'rowsToCache' first rows of the partition. We could read those 'rowsToCache' first rows @@ -309,7 +309,7 @@ public abstract class SinglePartitionReadCommand<F extends ClusteringIndexFilter try { - int rowsToCache = cacheFullPartitions ? Integer.MAX_VALUE : metadata().getCaching().rowCache.rowsToCache; + int rowsToCache = metadata().params.caching.rowsPerPartitionToCache(); @SuppressWarnings("resource") // we close on exception or upon closing the result of this method UnfilteredRowIterator iter = SinglePartitionReadCommand.fullPartitionRead(metadata(), nowInSec(), partitionKey()).queryMemtableAndDisk(cfs, readOp); try http://git-wip-us.apache.org/repos/asf/cassandra/blob/b31845c4/src/java/org/apache/cassandra/db/SystemKeyspace.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java index d17eaf7..2d0ca24 100644 --- a/src/java/org/apache/cassandra/db/SystemKeyspace.java +++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java @@ -31,6 +31,7 @@ import com.google.common.io.ByteStreams; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.apache.cassandra.config.CFMetaData; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.cql3.QueryProcessor; @@ -39,7 +40,6 @@ import org.apache.cassandra.cql3.functions.*; import org.apache.cassandra.db.partitions.*; import org.apache.cassandra.db.commitlog.ReplayPosition; import org.apache.cassandra.db.compaction.CompactionHistoryTabularData; -import org.apache.cassandra.db.compaction.LeveledCompactionStrategy; import org.apache.cassandra.db.marshal.*; import org.apache.cassandra.dht.IPartitioner; import org.apache.cassandra.dht.Range; @@ -53,12 +53,8 @@ import org.apache.cassandra.io.util.NIODataInputStream; import org.apache.cassandra.locator.IEndpointSnitch; import org.apache.cassandra.metrics.RestorableMeter; import org.apache.cassandra.net.MessagingService; -import org.apache.cassandra.schema.Functions; -import org.apache.cassandra.schema.KeyspaceMetadata; -import org.apache.cassandra.schema.KeyspaceParams; -import org.apache.cassandra.schema.SchemaKeyspace; +import org.apache.cassandra.schema.*; import org.apache.cassandra.schema.Tables; -import org.apache.cassandra.schema.Types; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.service.paxos.Commit; import org.apache.cassandra.service.paxos.PaxosState; @@ -66,6 +62,9 @@ import org.apache.cassandra.thrift.cassandraConstants; import org.apache.cassandra.transport.Server; import org.apache.cassandra.utils.*; +import static java.util.Collections.emptyMap; +import static java.util.Collections.singletonMap; + import static org.apache.cassandra.cql3.QueryProcessor.executeInternal; import static org.apache.cassandra.cql3.QueryProcessor.executeOnceInternal; @@ -121,7 +120,7 @@ public final class SystemKeyspace + "mutation blob," + "PRIMARY KEY ((target_id), hint_id, message_version)) " + "WITH COMPACT STORAGE") - .compactionStrategyOptions(Collections.singletonMap("enabled", "false")) + .compaction(CompactionParams.scts(singletonMap("enabled", "false"))) .gcGraceSeconds(0); public static final CFMetaData Batchlog = @@ -133,7 +132,7 @@ public final class SystemKeyspace + "version int," + "written_at timestamp," + "PRIMARY KEY ((id)))") - .compactionStrategyOptions(Collections.singletonMap("min_threshold", "2")) + .compaction(CompactionParams.scts(singletonMap("min_threshold", "2"))) .gcGraceSeconds(0); private static final CFMetaData Paxos = @@ -150,7 +149,7 @@ public final class SystemKeyspace + "proposal_ballot timeuuid," + "proposal_version int," + "PRIMARY KEY ((row_key), cf_id))") - .compactionStrategyClass(LeveledCompactionStrategy.class); + .compaction(CompactionParams.lcs(emptyMap())); private static final CFMetaData BuiltIndexes = compile(BUILT_INDEXES, @@ -610,7 +609,7 @@ public final class SystemKeyspace { ReplayPosition.serializer.serialize(position, out); out.writeLong(truncatedAt); - return Collections.singletonMap(cfs.metadata.cfId, ByteBuffer.wrap(out.getData(), 0, out.getLength())); + return singletonMap(cfs.metadata.cfId, ByteBuffer.wrap(out.getData(), 0, out.getLength())); } catch (IOException e) { @@ -1093,7 +1092,7 @@ public final class SystemKeyspace private static int paxosTtl(CFMetaData metadata) { // keep paxos state around for at least 3h - return Math.max(3 * 3600, metadata.getGcGraceSeconds()); + return Math.max(3 * 3600, metadata.params.gcGraceSeconds); } public static void savePaxosCommit(Commit commit) http://git-wip-us.apache.org/repos/asf/cassandra/blob/b31845c4/src/java/org/apache/cassandra/db/commitlog/CommitLog.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java index c50beb6..e096011 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLog.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLog.java @@ -38,7 +38,7 @@ import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.config.ParameterizedClass; import org.apache.cassandra.db.*; import org.apache.cassandra.io.FSWriteError; -import org.apache.cassandra.io.compress.CompressionParameters; +import org.apache.cassandra.schema.CompressionParams; import org.apache.cassandra.io.compress.ICompressor; import org.apache.cassandra.io.util.BufferedDataOutputStreamPlus; import org.apache.cassandra.io.util.DataOutputBufferFixed; @@ -93,7 +93,7 @@ public class CommitLog implements CommitLogMBean { compressorClass = DatabaseDescriptor.getCommitLogCompression(); this.location = location; - ICompressor compressor = compressorClass != null ? CompressionParameters.createCompressor(compressorClass) : null; + ICompressor compressor = compressorClass != null ? CompressionParams.createCompressor(compressorClass) : null; DatabaseDescriptor.createAllDirectories(); this.compressor = compressor; http://git-wip-us.apache.org/repos/asf/cassandra/blob/b31845c4/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java index ec270dd..e7d115d 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogArchiver.java @@ -33,8 +33,7 @@ import java.util.concurrent.*; import org.apache.cassandra.concurrent.JMXEnabledThreadPoolExecutor; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.exceptions.ConfigurationException; -import org.apache.cassandra.io.compress.CompressionParameters; -import org.apache.cassandra.io.util.FileUtils; +import org.apache.cassandra.schema.CompressionParams; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.WrappedRunnable; import org.slf4j.Logger; @@ -212,7 +211,7 @@ public class CommitLogArchiver if (descriptor.compression != null) { try { - CompressionParameters.createCompressor(descriptor.compression); + CompressionParams.createCompressor(descriptor.compression); } catch (ConfigurationException e) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/b31845c4/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java index 1e12ed6..d232a63 100644 --- a/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java +++ b/src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java @@ -47,7 +47,7 @@ import org.apache.cassandra.db.rows.SerializationHelper; import org.apache.cassandra.db.partitions.PartitionUpdate; import org.apache.cassandra.db.lifecycle.SSTableSet; import org.apache.cassandra.exceptions.ConfigurationException; -import org.apache.cassandra.io.compress.CompressionParameters; +import org.apache.cassandra.schema.CompressionParams; import org.apache.cassandra.io.compress.ICompressor; import org.apache.cassandra.io.util.ByteBufferDataInput; import org.apache.cassandra.io.util.DataInputBuffer; @@ -307,7 +307,7 @@ public class CommitLogReplayer { try { - compressor = CompressionParameters.createCompressor(desc.compression); + compressor = CompressionParams.createCompressor(desc.compression); } catch (ConfigurationException e) { http://git-wip-us.apache.org/repos/asf/cassandra/blob/b31845c4/src/java/org/apache/cassandra/db/compaction/CompactionManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java index fc0acff..92cc249 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionManager.java @@ -808,7 +808,7 @@ public class CompactionManager implements CompactionManagerMBean long totalkeysWritten = 0; - long expectedBloomFilterSize = Math.max(cfs.metadata.getMinIndexInterval(), + long expectedBloomFilterSize = Math.max(cfs.metadata.params.minIndexInterval, SSTableReader.getApproximateKeyCount(txn.originals())); if (logger.isDebugEnabled()) logger.debug("Expected bloom filter size : {}", expectedBloomFilterSize); @@ -1201,7 +1201,7 @@ public class CompactionManager implements CompactionManagerMBean CompactionController controller = new CompactionController(cfs, sstableAsSet, getDefaultGcBefore(cfs, nowInSec)); CompactionIterator ci = new CompactionIterator(OperationType.ANTICOMPACTION, scanners.scanners, controller, nowInSec, UUIDGen.getTimeUUID(), metrics)) { - int expectedBloomFilterSize = Math.max(cfs.metadata.getMinIndexInterval(), (int)(SSTableReader.getApproximateKeyCount(sstableAsSet))); + int expectedBloomFilterSize = Math.max(cfs.metadata.params.minIndexInterval, (int)(SSTableReader.getApproximateKeyCount(sstableAsSet))); repairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs, destination, expectedBloomFilterSize, repairedAt, sstableAsSet, anticompactionGroup)); unRepairedSSTableWriter.switchWriter(CompactionManager.createWriterForAntiCompaction(cfs, destination, expectedBloomFilterSize, ActiveRepairService.UNREPAIRED_SSTABLE, sstableAsSet, anticompactionGroup)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/b31845c4/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java index e2ae6b6..d1b004d 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionStrategyManager.java @@ -18,16 +18,9 @@ package org.apache.cassandra.db.compaction; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; import java.util.concurrent.Callable; -import com.google.common.collect.ImmutableMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,17 +29,11 @@ import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.Memtable; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.db.lifecycle.SSTableSet; -import org.apache.cassandra.db.lifecycle.View; import org.apache.cassandra.dht.Range; import org.apache.cassandra.dht.Token; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.cassandra.io.sstable.ISSTableScanner; -import org.apache.cassandra.notifications.INotification; -import org.apache.cassandra.notifications.INotificationConsumer; -import org.apache.cassandra.notifications.SSTableAddedNotification; -import org.apache.cassandra.notifications.SSTableDeletingNotification; -import org.apache.cassandra.notifications.SSTableListChangedNotification; -import org.apache.cassandra.notifications.SSTableRepairStatusChanged; +import org.apache.cassandra.notifications.*; /** * Manages the compaction strategies. @@ -56,14 +43,12 @@ import org.apache.cassandra.notifications.SSTableRepairStatusChanged; */ public class CompactionStrategyManager implements INotificationConsumer { - protected static final String COMPACTION_ENABLED = "enabled"; private static final Logger logger = LoggerFactory.getLogger(CompactionStrategyManager.class); private final ColumnFamilyStore cfs; private volatile AbstractCompactionStrategy repaired; private volatile AbstractCompactionStrategy unrepaired; private volatile boolean enabled = true; public boolean isActive = true; - private Map<String, String> options; public CompactionStrategyManager(ColumnFamilyStore cfs) { @@ -71,9 +56,7 @@ public class CompactionStrategyManager implements INotificationConsumer logger.debug("{} subscribed to the data tracker.", this); this.cfs = cfs; reload(cfs.metadata); - String optionValue = cfs.metadata.compactionStrategyOptions.get(COMPACTION_ENABLED); - enabled = optionValue == null || Boolean.parseBoolean(optionValue); - options = ImmutableMap.copyOf(cfs.metadata.compactionStrategyOptions); + enabled = cfs.metadata.params.compaction.isEnabled(); } /** @@ -159,13 +142,12 @@ public class CompactionStrategyManager implements INotificationConsumer unrepaired.shutdown(); } - public synchronized void maybeReload(CFMetaData metadata) { - if (repaired != null && repaired.getClass().equals(metadata.compactionStrategyClass) - && unrepaired != null && unrepaired.getClass().equals(metadata.compactionStrategyClass) - && repaired.options.equals(metadata.compactionStrategyOptions) // todo: assumes all have the same options - && unrepaired.options.equals(metadata.compactionStrategyOptions)) + if (repaired != null && repaired.getClass().equals(metadata.params.compaction.klass()) + && unrepaired != null && unrepaired.getClass().equals(metadata.params.compaction.klass()) + && repaired.options.equals(metadata.params.compaction.options()) // todo: assumes all have the same options + && unrepaired.options.equals(metadata.params.compaction.options())) return; reload(metadata); } @@ -185,7 +167,6 @@ public class CompactionStrategyManager implements INotificationConsumer unrepaired.shutdown(); repaired = metadata.createCompactionStrategyInstance(cfs); unrepaired = metadata.createCompactionStrategyInstance(cfs); - options = ImmutableMap.copyOf(metadata.compactionStrategyOptions); if (disabledWithJMX || !shouldBeEnabled()) disable(); else @@ -445,8 +426,7 @@ public class CompactionStrategyManager implements INotificationConsumer public boolean shouldBeEnabled() { - String optionValue = options.get(COMPACTION_ENABLED); - return optionValue == null || Boolean.parseBoolean(optionValue); + return cfs.metadata.params.compaction.isEnabled(); } public String getName() http://git-wip-us.apache.org/repos/asf/cassandra/blob/b31845c4/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java index 4f5e371..8fa3b8f 100644 --- a/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/DateTieredCompactionStrategy.java @@ -22,17 +22,15 @@ import java.util.*; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Predicate; import com.google.common.collect.*; - -import org.apache.cassandra.db.lifecycle.SSTableSet; -import org.apache.cassandra.db.lifecycle.View; -import org.apache.cassandra.io.sstable.format.SSTableReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.cql3.statements.CFPropDefs; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; +import org.apache.cassandra.db.lifecycle.SSTableSet; import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.schema.CompactionParams; import org.apache.cassandra.utils.Pair; import static com.google.common.collect.Iterables.filter; @@ -422,8 +420,8 @@ public class DateTieredCompactionStrategy extends AbstractCompactionStrategy Map<String, String> uncheckedOptions = AbstractCompactionStrategy.validateOptions(options); uncheckedOptions = DateTieredCompactionStrategyOptions.validateOptions(options, uncheckedOptions); - uncheckedOptions.remove(CFPropDefs.KW_MINCOMPACTIONTHRESHOLD); - uncheckedOptions.remove(CFPropDefs.KW_MAXCOMPACTIONTHRESHOLD); + uncheckedOptions.remove(CompactionParams.Option.MIN_THRESHOLD.toString()); + uncheckedOptions.remove(CompactionParams.Option.MAX_THRESHOLD.toString()); return uncheckedOptions; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/b31845c4/src/java/org/apache/cassandra/db/compaction/Scrubber.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/Scrubber.java b/src/java/org/apache/cassandra/db/compaction/Scrubber.java index 57fe267..891fac8 100644 --- a/src/java/org/apache/cassandra/db/compaction/Scrubber.java +++ b/src/java/org/apache/cassandra/db/compaction/Scrubber.java @@ -124,7 +124,7 @@ public class Scrubber implements Closeable } this.checkData = checkData && !this.isIndex; //LocalByPartitionerType does not support validation this.expectedBloomFilterSize = Math.max( - cfs.metadata.getMinIndexInterval(), + cfs.metadata.params.minIndexInterval, hasIndexFile ? SSTableReader.getApproximateKeyCount(toScrub) : 0); // loop through each row, deserializing to check for damage. http://git-wip-us.apache.org/repos/asf/cassandra/blob/b31845c4/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java index fa3918f..09d40c8 100644 --- a/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java +++ b/src/java/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategy.java @@ -22,18 +22,16 @@ import java.util.Map.Entry; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Iterables; -import com.google.common.collect.Sets; - -import org.apache.cassandra.db.compaction.writers.CompactionAwareWriter; -import org.apache.cassandra.db.compaction.writers.SplittingSizeTieredCompactionWriter; -import org.apache.cassandra.io.sstable.format.SSTableReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.cql3.statements.CFPropDefs; import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.compaction.writers.CompactionAwareWriter; +import org.apache.cassandra.db.compaction.writers.SplittingSizeTieredCompactionWriter; import org.apache.cassandra.db.lifecycle.LifecycleTransaction; import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.io.sstable.format.SSTableReader; +import org.apache.cassandra.schema.CompactionParams; import org.apache.cassandra.utils.Pair; import static com.google.common.collect.Iterables.filter; @@ -304,8 +302,8 @@ public class SizeTieredCompactionStrategy extends AbstractCompactionStrategy Map<String, String> uncheckedOptions = AbstractCompactionStrategy.validateOptions(options); uncheckedOptions = SizeTieredCompactionStrategyOptions.validateOptions(options, uncheckedOptions); - uncheckedOptions.remove(CFPropDefs.KW_MINCOMPACTIONTHRESHOLD); - uncheckedOptions.remove(CFPropDefs.KW_MAXCOMPACTIONTHRESHOLD); + uncheckedOptions.remove(CompactionParams.Option.MIN_THRESHOLD.toString()); + uncheckedOptions.remove(CompactionParams.Option.MAX_THRESHOLD.toString()); return uncheckedOptions; } http://git-wip-us.apache.org/repos/asf/cassandra/blob/b31845c4/src/java/org/apache/cassandra/db/compaction/Upgrader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/compaction/Upgrader.java b/src/java/org/apache/cassandra/db/compaction/Upgrader.java index b8a102e..5a36210 100644 --- a/src/java/org/apache/cassandra/db/compaction/Upgrader.java +++ b/src/java/org/apache/cassandra/db/compaction/Upgrader.java @@ -61,7 +61,7 @@ public class Upgrader this.controller = new UpgradeController(cfs); this.strategyManager = cfs.getCompactionStrategyManager(); - long estimatedTotalKeys = Math.max(cfs.metadata.getMinIndexInterval(), SSTableReader.getApproximateKeyCount(Arrays.asList(this.sstable))); + long estimatedTotalKeys = Math.max(cfs.metadata.params.minIndexInterval, SSTableReader.getApproximateKeyCount(Arrays.asList(this.sstable))); long estimatedSSTables = Math.max(1, SSTableReader.getTotalBytes(Arrays.asList(this.sstable)) / strategyManager.getMaxSSTableBytes()); this.estimatedRows = (long) Math.ceil((double) estimatedTotalKeys / estimatedSSTables); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/b31845c4/src/java/org/apache/cassandra/db/rows/BufferCell.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/rows/BufferCell.java b/src/java/org/apache/cassandra/db/rows/BufferCell.java index 81c42d4..f9a3026 100644 --- a/src/java/org/apache/cassandra/db/rows/BufferCell.java +++ b/src/java/org/apache/cassandra/db/rows/BufferCell.java @@ -61,8 +61,8 @@ public class BufferCell extends AbstractCell public static BufferCell live(CFMetaData metadata, ColumnDefinition column, long timestamp, ByteBuffer value, CellPath path) { - if (metadata.getDefaultTimeToLive() != NO_TTL) - return expiring(column, timestamp, metadata.getDefaultTimeToLive(), FBUtilities.nowInSeconds(), value, path); + if (metadata.params.defaultTimeToLive != NO_TTL) + return expiring(column, timestamp, metadata.params.defaultTimeToLive, FBUtilities.nowInSeconds(), value, path); return new BufferCell(column, timestamp, NO_TTL, NO_DELETION_TIME, value, path); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/b31845c4/src/java/org/apache/cassandra/db/view/MaterializedView.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/db/view/MaterializedView.java b/src/java/org/apache/cassandra/db/view/MaterializedView.java index f36abae..988bfc5 100644 --- a/src/java/org/apache/cassandra/db/view/MaterializedView.java +++ b/src/java/org/apache/cassandra/db/view/MaterializedView.java @@ -684,9 +684,6 @@ public class MaterializedView } } - CFMetaData cfm = viewBuilder.build(); - properties.properties.applyToCFMetadata(cfm); - - return cfm; + return viewBuilder.build().params(properties.properties.asNewTableParams()); } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/b31845c4/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java b/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java index 45c2cfa..464ac3d 100644 --- a/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java +++ b/src/java/org/apache/cassandra/dht/OrderPreservingPartitioner.java @@ -221,7 +221,7 @@ public class OrderPreservingPartitioner implements IPartitioner for (Range<Token> r : sortedRanges) { // Looping over every KS:CF:Range, get the splits size and add it to the count - allTokens.put(r.right, allTokens.get(r.right) + StorageService.instance.getSplits(ks, cfmd.cfName, r, cfmd.getMinIndexInterval()).size()); + allTokens.put(r.right, allTokens.get(r.right) + StorageService.instance.getSplits(ks, cfmd.cfName, r, cfmd.params.minIndexInterval).size()); } } } http://git-wip-us.apache.org/repos/asf/cassandra/blob/b31845c4/src/java/org/apache/cassandra/hadoop/ConfigHelper.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java index b5c66df..a9dcc6c 100644 --- a/src/java/org/apache/cassandra/hadoop/ConfigHelper.java +++ b/src/java/org/apache/cassandra/hadoop/ConfigHelper.java @@ -1,4 +1,3 @@ -package org.apache.cassandra.hadoop; /* * * Licensed to the Apache Software Foundation (ASF) under one @@ -19,6 +18,7 @@ package org.apache.cassandra.hadoop; * under the License. * */ +package org.apache.cassandra.hadoop; import java.io.IOException; import java.util.*; @@ -27,7 +27,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.cassandra.dht.IPartitioner; -import org.apache.cassandra.io.compress.CompressionParameters; +import org.apache.cassandra.schema.CompressionParams; import org.apache.cassandra.thrift.*; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.Hex; @@ -456,7 +456,7 @@ public class ConfigHelper public static String getOutputCompressionChunkLength(Configuration conf) { - return conf.get(OUTPUT_COMPRESSION_CHUNK_LENGTH, String.valueOf(CompressionParameters.DEFAULT_CHUNK_LENGTH)); + return conf.get(OUTPUT_COMPRESSION_CHUNK_LENGTH, String.valueOf(CompressionParams.DEFAULT_CHUNK_LENGTH)); } public static void setOutputCompressionClass(Configuration conf, String classname) http://git-wip-us.apache.org/repos/asf/cassandra/blob/b31845c4/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java index 0717121..bc1e6f6 100644 --- a/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java +++ b/src/java/org/apache/cassandra/io/compress/CompressedSequentialWriter.java @@ -32,7 +32,7 @@ import org.apache.cassandra.io.sstable.metadata.MetadataCollector; import org.apache.cassandra.io.util.DataIntegrityMetadata; import org.apache.cassandra.io.util.FileMark; import org.apache.cassandra.io.util.SequentialWriter; -import org.apache.cassandra.utils.FBUtilities; +import org.apache.cassandra.schema.CompressionParams; public class CompressedSequentialWriter extends SequentialWriter { @@ -60,7 +60,7 @@ public class CompressedSequentialWriter extends SequentialWriter public CompressedSequentialWriter(File file, String offsetsPath, - CompressionParameters parameters, + CompressionParams parameters, MetadataCollector sstableMetadataCollector) { super(file, parameters.chunkLength(), parameters.getSstableCompressor().preferredBufferType()); http://git-wip-us.apache.org/repos/asf/cassandra/blob/b31845c4/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java index ff9ae64..bd6da2c 100644 --- a/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java +++ b/src/java/org/apache/cassandra/io/compress/CompressionMetadata.java @@ -52,6 +52,7 @@ import org.apache.cassandra.io.util.DataOutputPlus; import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.io.util.Memory; import org.apache.cassandra.io.util.SafeMemory; +import org.apache.cassandra.schema.CompressionParams; import org.apache.cassandra.utils.Pair; import org.apache.cassandra.utils.concurrent.Transactional; import org.apache.cassandra.utils.concurrent.Ref; @@ -69,7 +70,7 @@ public class CompressionMetadata private final Memory chunkOffsets; private final long chunkOffsetsSize; public final String indexFilePath; - public final CompressionParameters parameters; + public final CompressionParams parameters; /** * Create metadata about given compressed file including uncompressed data length, chunk size @@ -107,11 +108,11 @@ public class CompressionMetadata int chunkLength = stream.readInt(); try { - parameters = new CompressionParameters(compressorName, chunkLength, options); + parameters = new CompressionParams(compressorName, chunkLength, options); } catch (ConfigurationException e) { - throw new RuntimeException("Cannot create CompressionParameters for stored parameters", e); + throw new RuntimeException("Cannot create CompressionParams for stored parameters", e); } dataLength = stream.readLong(); @@ -130,7 +131,7 @@ public class CompressionMetadata this.chunkOffsetsSize = chunkOffsets.size(); } - private CompressionMetadata(String filePath, CompressionParameters parameters, SafeMemory offsets, long offsetsSize, long dataLength, long compressedLength) + private CompressionMetadata(String filePath, CompressionParams parameters, SafeMemory offsets, long offsetsSize, long dataLength, long compressedLength) { this.indexFilePath = filePath; this.parameters = parameters; @@ -275,7 +276,7 @@ public class CompressionMetadata public static class Writer extends Transactional.AbstractTransactional implements Transactional { // path to the file - private final CompressionParameters parameters; + private final CompressionParams parameters; private final String filePath; private int maxCount = 100; private SafeMemory offsets = new SafeMemory(maxCount * 8L); @@ -284,13 +285,13 @@ public class CompressionMetadata // provided by user when setDescriptor private long dataLength, chunkCount; - private Writer(CompressionParameters parameters, String path) + private Writer(CompressionParams parameters, String path) { this.parameters = parameters; filePath = path; } - public static Writer open(CompressionParameters parameters, String path) + public static Writer open(CompressionParams parameters, String path) { return new Writer(parameters, path); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/b31845c4/src/java/org/apache/cassandra/io/compress/CompressionParameters.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/compress/CompressionParameters.java b/src/java/org/apache/cassandra/io/compress/CompressionParameters.java deleted file mode 100644 index c828e27..0000000 --- a/src/java/org/apache/cassandra/io/compress/CompressionParameters.java +++ /dev/null @@ -1,564 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.cassandra.io.compress; - -import java.io.IOException; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.util.AbstractSet; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; - -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Sets; - -import org.apache.commons.lang3.builder.EqualsBuilder; -import org.apache.commons.lang3.builder.HashCodeBuilder; - -import org.slf4j.LoggerFactory; -import org.slf4j.Logger; -import org.apache.cassandra.config.CFMetaData; -import org.apache.cassandra.config.ParameterizedClass; -import org.apache.cassandra.db.TypeSizes; -import org.apache.cassandra.exceptions.ConfigurationException; -import org.apache.cassandra.io.IVersionedSerializer; -import org.apache.cassandra.io.util.DataInputPlus; -import org.apache.cassandra.io.util.DataOutputPlus; - -public class CompressionParameters -{ - private final static Logger LOGGER = LoggerFactory.getLogger(CompressionParameters.class); - - private volatile static boolean hasLoggedSsTableCompressionWarning; - private volatile static boolean hasLoggedChunkLengthWarning; - - public final static int DEFAULT_CHUNK_LENGTH = 65536; - public final static double DEFAULT_CRC_CHECK_CHANCE = 1.0; - public final static IVersionedSerializer<CompressionParameters> serializer = new Serializer(); - - public static final String CLASS = "class"; - public static final String CHUNK_LENGTH_IN_KB = "chunk_length_in_kb"; - public static final String ENABLED = "enabled"; - @Deprecated - public static final String SSTABLE_COMPRESSION = "sstable_compression"; - @Deprecated - public static final String CHUNK_LENGTH_KB = "chunk_length_kb"; - public static final String CRC_CHECK_CHANCE = "crc_check_chance"; - - public static final Set<String> GLOBAL_OPTIONS = ImmutableSet.of(CRC_CHECK_CHANCE); - - private final ICompressor sstableCompressor; - private final Integer chunkLength; - private volatile double crcCheckChance; - private final ImmutableMap<String, String> otherOptions; // Unrecognized options, can be use by the compressor - private CFMetaData liveMetadata; - - public static CompressionParameters fromMap(Map<? extends CharSequence, ? extends CharSequence> opts) - { - Map<String, String> options = copyOptions(opts); - - String sstableCompressionClass; - - if (!removeEnabled(options)) - { - sstableCompressionClass = null; - - if (!options.isEmpty()) - throw new ConfigurationException("If the '" + ENABLED + "' option is set to false" - + " no other options must be specified"); - } - else - { - sstableCompressionClass= removeSstableCompressionClass(options); - } - - Integer chunkLength = removeChunkLength(options); - - CompressionParameters cp = new CompressionParameters(sstableCompressionClass, chunkLength, options); - cp.validate(); - - return cp; - } - - public static CompressionParameters noCompression() - { - return new CompressionParameters((ICompressor) null, DEFAULT_CHUNK_LENGTH, Collections.emptyMap()); - } - - public static CompressionParameters snappy() - { - return snappy(null); - } - - public static CompressionParameters snappy(Integer chunkLength) - { - return new CompressionParameters(SnappyCompressor.instance, chunkLength, Collections.emptyMap()); - } - - public static CompressionParameters deflate() - { - return deflate(null); - } - - public static CompressionParameters deflate(Integer chunkLength) - { - return new CompressionParameters(DeflateCompressor.instance, chunkLength, Collections.emptyMap()); - } - - public static CompressionParameters lz4() - { - return lz4(null); - } - - public static CompressionParameters lz4(Integer chunkLength) - { - return new CompressionParameters(LZ4Compressor.instance, chunkLength, Collections.emptyMap()); - } - - CompressionParameters(String sstableCompressorClass, Integer chunkLength, Map<String, String> otherOptions) throws ConfigurationException - { - this(createCompressor(parseCompressorClass(sstableCompressorClass), otherOptions), chunkLength, otherOptions); - } - - private CompressionParameters(ICompressor sstableCompressor, Integer chunkLength, Map<String, String> otherOptions) throws ConfigurationException - { - this.sstableCompressor = sstableCompressor; - this.chunkLength = chunkLength; - this.otherOptions = ImmutableMap.copyOf(otherOptions); - String chance = otherOptions.get(CRC_CHECK_CHANCE); - this.crcCheckChance = (chance == null) ? DEFAULT_CRC_CHECK_CHANCE : parseCrcCheckChance(chance); - } - - public CompressionParameters copy() - { - return new CompressionParameters(sstableCompressor, chunkLength, otherOptions); - } - - public void setLiveMetadata(final CFMetaData liveMetadata) - { - if (liveMetadata == null) - return; - - this.liveMetadata = liveMetadata; - } - - public void setCrcCheckChance(double crcCheckChance) throws ConfigurationException - { - validateCrcCheckChance(crcCheckChance); - this.crcCheckChance = crcCheckChance; - - if (liveMetadata != null && this != liveMetadata.compressionParameters) - liveMetadata.compressionParameters.setCrcCheckChance(crcCheckChance); - } - - /** - * Checks if compression is enabled. - * @return <code>true</code> if compression is enabled, <code>false</code> otherwise. - */ - public boolean isEnabled() - { - return sstableCompressor != null; - } - - /** - * Returns the SSTable compressor. - * @return the SSTable compressor or <code>null</code> if compression is disabled. - */ - public ICompressor getSstableCompressor() - { - return sstableCompressor; - } - - public ImmutableMap<String, String> getOtherOptions() - { - return otherOptions; - } - - public double getCrcCheckChance() - { - return liveMetadata == null ? this.crcCheckChance : liveMetadata.compressionParameters.crcCheckChance; - } - - private static double parseCrcCheckChance(String crcCheckChance) throws ConfigurationException - { - try - { - double chance = Double.parseDouble(crcCheckChance); - validateCrcCheckChance(chance); - return chance; - } - catch (NumberFormatException e) - { - throw new ConfigurationException("crc_check_chance should be a double"); - } - } - - private static void validateCrcCheckChance(double crcCheckChance) throws ConfigurationException - { - if (crcCheckChance < 0.0d || crcCheckChance > 1.0d) - throw new ConfigurationException("crc_check_chance should be between 0.0 and 1.0"); - } - - public int chunkLength() - { - return chunkLength == null ? DEFAULT_CHUNK_LENGTH : chunkLength; - } - - private static Class<?> parseCompressorClass(String className) throws ConfigurationException - { - if (className == null || className.isEmpty()) - return null; - - className = className.contains(".") ? className : "org.apache.cassandra.io.compress." + className; - try - { - return Class.forName(className); - } - catch (Exception e) - { - throw new ConfigurationException("Could not create Compression for type " + className, e); - } - } - - private static ICompressor createCompressor(Class<?> compressorClass, Map<String, String> compressionOptions) throws ConfigurationException - { - if (compressorClass == null) - { - if (!compressionOptions.isEmpty()) - throw new ConfigurationException("Unknown compression options (" + compressionOptions.keySet() + ") since no compression class found"); - return null; - } - - try - { - Method method = compressorClass.getMethod("create", Map.class); - ICompressor compressor = (ICompressor)method.invoke(null, compressionOptions); - // Check for unknown options - AbstractSet<String> supportedOpts = Sets.union(compressor.supportedOptions(), GLOBAL_OPTIONS); - for (String provided : compressionOptions.keySet()) - if (!supportedOpts.contains(provided)) - throw new ConfigurationException("Unknown compression options " + provided); - return compressor; - } - catch (NoSuchMethodException e) - { - throw new ConfigurationException("create method not found", e); - } - catch (SecurityException e) - { - throw new ConfigurationException("Access forbiden", e); - } - catch (IllegalAccessException e) - { - throw new ConfigurationException("Cannot access method create in " + compressorClass.getName(), e); - } - catch (InvocationTargetException e) - { - Throwable cause = e.getCause(); - throw new ConfigurationException(String.format("%s.create() threw an error: %s", - compressorClass.getSimpleName(), - cause == null ? e.getClass().getName() + " " + e.getMessage() : cause.getClass().getName() + " " + cause.getMessage()), - e); - } - catch (ExceptionInInitializerError e) - { - throw new ConfigurationException("Cannot initialize class " + compressorClass.getName()); - } - } - - public static ICompressor createCompressor(ParameterizedClass compression) throws ConfigurationException { - return createCompressor(parseCompressorClass(compression.class_name), copyOptions(compression.parameters)); - } - - private static Map<String, String> copyOptions(Map<? extends CharSequence, ? extends CharSequence> co) - { - if (co == null || co.isEmpty()) - return Collections.<String, String>emptyMap(); - - Map<String, String> compressionOptions = new HashMap<String, String>(); - for (Map.Entry<? extends CharSequence, ? extends CharSequence> entry : co.entrySet()) - { - compressionOptions.put(entry.getKey().toString(), entry.getValue().toString()); - } - return compressionOptions; - } - - /** - * Parse the chunk length (in KB) and returns it as bytes. - * - * @param chLengthKB the length of the chunk to parse - * @return the chunk length in bytes - * @throws ConfigurationException if the chunk size is too large - */ - private static Integer parseChunkLength(String chLengthKB) throws ConfigurationException - { - if (chLengthKB == null) - return null; - - try - { - int parsed = Integer.parseInt(chLengthKB); - if (parsed > Integer.MAX_VALUE / 1024) - throw new ConfigurationException("Value of " + CHUNK_LENGTH_IN_KB + " is too large (" + parsed + ")"); - return 1024 * parsed; - } - catch (NumberFormatException e) - { - throw new ConfigurationException("Invalid value for " + CHUNK_LENGTH_IN_KB, e); - } - } - - /** - * Removes the chunk length option from the specified set of option. - * - * @param options the options - * @return the chunk length value - */ - private static Integer removeChunkLength(Map<String, String> options) - { - if (options.containsKey(CHUNK_LENGTH_IN_KB)) - { - if (options.containsKey(CHUNK_LENGTH_KB)) - { - throw new ConfigurationException(String.format("The '%s' option must not be used if the chunk length is already specified by the '%s' option", - CHUNK_LENGTH_KB, - CHUNK_LENGTH_IN_KB)); - } - - return parseChunkLength(options.remove(CHUNK_LENGTH_IN_KB)); - } - - if (options.containsKey(CHUNK_LENGTH_KB)) - { - if (options.containsKey(CHUNK_LENGTH_KB) && !hasLoggedChunkLengthWarning) - { - hasLoggedChunkLengthWarning = true; - LOGGER.warn(String.format("The %s option has been deprecated. You should use %s instead", - CHUNK_LENGTH_KB, - CHUNK_LENGTH_IN_KB)); - } - - return parseChunkLength(options.remove(CHUNK_LENGTH_KB)); - } - - return null; - } - - /** - * Returns <code>true</code> if the specified options contains the name of the compression class to be used, - * <code>false</code> otherwise. - * - * @param options the options - * @return <code>true</code> if the specified options contains the name of the compression class to be used, - * <code>false</code> otherwise. - */ - public static boolean containsSstableCompressionClass(Map<String, String> options) - { - return options.containsKey(CLASS) - || options.containsKey(SSTABLE_COMPRESSION); - } - - /** - * Removes the option specifying the name of the compression class - * - * @param options the options - * @return the name of the compression class - */ - private static String removeSstableCompressionClass(Map<String, String> options) - { - if (options.containsKey(CLASS)) - { - if (options.containsKey(SSTABLE_COMPRESSION)) - throw new ConfigurationException(String.format("The '%s' option must not be used if the compression algorithm is already specified by the '%s' option", - SSTABLE_COMPRESSION, - CLASS)); - - String clazz = options.remove(CLASS); - if (clazz.isEmpty()) - throw new ConfigurationException(String.format("The '%s' option must not be empty. To disable compression use 'enabled' : false", CLASS)); - - return clazz; - } - - if (options.containsKey(SSTABLE_COMPRESSION) && !hasLoggedSsTableCompressionWarning) - { - hasLoggedSsTableCompressionWarning = true; - LOGGER.warn(String.format("The %s option has been deprecated. You should use %s instead", - SSTABLE_COMPRESSION, - CLASS)); - } - - return options.remove(SSTABLE_COMPRESSION); - } - - /** - * Returns <code>true</code> if the options contains the <code>enabled</code> option and that its value is - * <code>true</code>, otherwise returns <code>false</code>. - * - * @param options the options - * @return <code>true</code> if the options contains the <code>enabled</code> option and that its value is - * <code>true</code>, otherwise returns <code>false</code>. - */ - public static boolean isEnabled(Map<String, String> options) - { - String enabled = options.get(ENABLED); - return enabled == null || Boolean.parseBoolean(enabled); - } - - /** - * Removes the <code>enabled</code> option from the specified options. - * - * @param options the options - * @return the value of the <code>enabled</code> option - */ - private static boolean removeEnabled(Map<String, String> options) - { - String enabled = options.remove(ENABLED); - return enabled == null || Boolean.parseBoolean(enabled); - } - - // chunkLength must be a power of 2 because we assume so when - // computing the chunk number from an uncompressed file offset (see - // CompressedRandomAccessReader.decompresseChunk()) - public void validate() throws ConfigurationException - { - // if chunk length was not set (chunkLength == null), this is fine, default will be used - if (chunkLength != null) - { - if (chunkLength <= 0) - throw new ConfigurationException("Invalid negative or null " + CHUNK_LENGTH_IN_KB); - - int c = chunkLength; - boolean found = false; - while (c != 0) - { - if ((c & 0x01) != 0) - { - if (found) - throw new ConfigurationException(CHUNK_LENGTH_IN_KB + " must be a power of 2"); - else - found = true; - } - c >>= 1; - } - } - - validateCrcCheckChance(crcCheckChance); - } - - public Map<String, String> asMap() - { - if (!isEnabled()) - return Collections.singletonMap(ENABLED, "false"); - - Map<String, String> options = new HashMap<String, String>(otherOptions); - options.put(CLASS, sstableCompressor.getClass().getName()); - options.put(CHUNK_LENGTH_IN_KB, chunkLengthInKB()); - return options; - } - - public String chunkLengthInKB() - { - return String.valueOf(chunkLength() / 1024); - } - - @Override - public boolean equals(Object obj) - { - if (obj == this) - { - return true; - } - else if (obj == null || obj.getClass() != getClass()) - { - return false; - } - - CompressionParameters cp = (CompressionParameters) obj; - return new EqualsBuilder() - .append(sstableCompressor, cp.sstableCompressor) - .append(chunkLength(), cp.chunkLength()) - .append(otherOptions, cp.otherOptions) - .isEquals(); - } - - @Override - public int hashCode() - { - return new HashCodeBuilder(29, 1597) - .append(sstableCompressor) - .append(chunkLength()) - .append(otherOptions) - .toHashCode(); - } - - static class Serializer implements IVersionedSerializer<CompressionParameters> - { - public void serialize(CompressionParameters parameters, DataOutputPlus out, int version) throws IOException - { - out.writeUTF(parameters.sstableCompressor.getClass().getSimpleName()); - out.writeInt(parameters.otherOptions.size()); - for (Map.Entry<String, String> entry : parameters.otherOptions.entrySet()) - { - out.writeUTF(entry.getKey()); - out.writeUTF(entry.getValue()); - } - out.writeInt(parameters.chunkLength()); - } - - public CompressionParameters deserialize(DataInputPlus in, int version) throws IOException - { - String compressorName = in.readUTF(); - int optionCount = in.readInt(); - Map<String, String> options = new HashMap<String, String>(); - for (int i = 0; i < optionCount; ++i) - { - String key = in.readUTF(); - String value = in.readUTF(); - options.put(key, value); - } - int chunkLength = in.readInt(); - CompressionParameters parameters; - try - { - parameters = new CompressionParameters(compressorName, chunkLength, options); - } - catch (ConfigurationException e) - { - throw new RuntimeException("Cannot create CompressionParameters for parameters", e); - } - return parameters; - } - - public long serializedSize(CompressionParameters parameters, int version) - { - long size = TypeSizes.sizeof(parameters.sstableCompressor.getClass().getSimpleName()); - size += TypeSizes.sizeof(parameters.otherOptions.size()); - for (Map.Entry<String, String> entry : parameters.otherOptions.entrySet()) - { - size += TypeSizes.sizeof(entry.getKey()); - size += TypeSizes.sizeof(entry.getValue()); - } - size += TypeSizes.sizeof(parameters.chunkLength()); - return size; - } - } -} http://git-wip-us.apache.org/repos/asf/cassandra/blob/b31845c4/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java b/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java index 5fd4309..069cc96 100644 --- a/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java +++ b/src/java/org/apache/cassandra/io/compress/LZ4Compressor.java @@ -27,6 +27,7 @@ import java.util.Set; import com.google.common.annotations.VisibleForTesting; import net.jpountz.lz4.LZ4Exception; import net.jpountz.lz4.LZ4Factory; +import org.apache.cassandra.schema.CompressionParams; public class LZ4Compressor implements ICompressor { @@ -126,7 +127,7 @@ public class LZ4Compressor implements ICompressor public Set<String> supportedOptions() { - return new HashSet<>(Arrays.asList(CompressionParameters.CRC_CHECK_CHANCE)); + return new HashSet<>(Arrays.asList(CompressionParams.CRC_CHECK_CHANCE)); } public BufferType preferredBufferType() http://git-wip-us.apache.org/repos/asf/cassandra/blob/b31845c4/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java index 9b6ab6b..28b5964 100644 --- a/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java +++ b/src/java/org/apache/cassandra/io/sstable/IndexSummaryManager.java @@ -332,8 +332,8 @@ public class IndexSummaryManager implements IndexSummaryManagerMBean long remainingSpace = memoryPoolCapacity; for (SSTableReader sstable : sstables) { - int minIndexInterval = sstable.metadata.getMinIndexInterval(); - int maxIndexInterval = sstable.metadata.getMaxIndexInterval(); + int minIndexInterval = sstable.metadata.params.minIndexInterval; + int maxIndexInterval = sstable.metadata.params.maxIndexInterval; double readsPerSec = sstable.getReadMeter() == null ? 0.0 : sstable.getReadMeter().fifteenMinuteRate(); long idealSpace = Math.round(remainingSpace * (readsPerSec / totalReadsPerSec)); http://git-wip-us.apache.org/repos/asf/cassandra/blob/b31845c4/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java index 5ceced5..f13fbba 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableReader.java @@ -36,7 +36,6 @@ import org.slf4j.LoggerFactory; import com.clearspring.analytics.stream.cardinality.CardinalityMergeException; import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus; import com.clearspring.analytics.stream.cardinality.ICardinality; -import org.apache.cassandra.cache.CachingOptions; import org.apache.cassandra.cache.InstrumentingCache; import org.apache.cassandra.cache.KeyCacheKey; import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor; @@ -56,6 +55,7 @@ import org.apache.cassandra.io.sstable.metadata.*; import org.apache.cassandra.io.util.*; import org.apache.cassandra.metrics.RestorableMeter; import org.apache.cassandra.metrics.StorageMetrics; +import org.apache.cassandra.schema.CachingParams; import org.apache.cassandra.service.ActiveRepairService; import org.apache.cassandra.service.CacheService; import org.apache.cassandra.utils.*; @@ -650,7 +650,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS private void load(ValidationMetadata validation) throws IOException { - if (metadata.getBloomFilterFpChance() == 1.0) + if (metadata.params.bloomFilterFpChance == 1.0) { // bf is disabled. load(false, true); @@ -667,7 +667,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS // bf is enabled, but filter component is missing. load(true, true); } - else if (validation.bloomFilterFPChance != metadata.getBloomFilterFpChance()) + else if (validation.bloomFilterFPChance != metadata.params.bloomFilterFpChance) { // bf fp chance in sstable metadata and it has changed since compaction. load(true, true); @@ -789,9 +789,9 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS : estimateRowsFromIndex(primaryIndex); // statistics is supposed to be optional if (recreateBloomFilter) - bf = FilterFactory.getFilter(estimatedKeys, metadata.getBloomFilterFpChance(), true, descriptor.version.hasOldBfHashOrder()); + bf = FilterFactory.getFilter(estimatedKeys, metadata.params.bloomFilterFpChance, true, descriptor.version.hasOldBfHashOrder()); - try (IndexSummaryBuilder summaryBuilder = summaryLoaded ? null : new IndexSummaryBuilder(estimatedKeys, metadata.getMinIndexInterval(), samplingLevel)) + try (IndexSummaryBuilder summaryBuilder = summaryLoaded ? null : new IndexSummaryBuilder(estimatedKeys, metadata.params.minIndexInterval, samplingLevel)) { long indexPosition; RowIndexEntry.IndexSerializer rowIndexSerializer = descriptor.getFormat().getIndexSerializer(metadata, descriptor.version, header); @@ -849,7 +849,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS iStream = new DataInputStream(new FileInputStream(summariesFile)); indexSummary = IndexSummary.serializer.deserialize( iStream, getPartitioner(), descriptor.version.hasSamplingLevel(), - metadata.getMinIndexInterval(), metadata.getMaxIndexInterval()); + metadata.params.minIndexInterval, metadata.params.maxIndexInterval); first = decorateKey(ByteBufferUtil.readWithLength(iStream)); last = decorateKey(ByteBufferUtil.readWithLength(iStream)); ibuilder.deserializeBounds(iStream); @@ -1134,8 +1134,8 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS { assert openReason != OpenReason.EARLY; - int minIndexInterval = metadata.getMinIndexInterval(); - int maxIndexInterval = metadata.getMaxIndexInterval(); + int minIndexInterval = metadata.params.minIndexInterval; + int maxIndexInterval = metadata.params.maxIndexInterval; double effectiveInterval = indexSummary.getEffectiveIndexInterval(); IndexSummary newSummary; @@ -1183,7 +1183,7 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS try { long indexSize = primaryIndex.length(); - try (IndexSummaryBuilder summaryBuilder = new IndexSummaryBuilder(estimatedKeys(), metadata.getMinIndexInterval(), newSamplingLevel)) + try (IndexSummaryBuilder summaryBuilder = new IndexSummaryBuilder(estimatedKeys(), metadata.params.minIndexInterval, newSamplingLevel)) { long indexPosition; while ((indexPosition = primaryIndex.getFilePointer()) != indexSize) @@ -1515,14 +1515,10 @@ public abstract class SSTableReader extends SSTable implements SelfRefCounted<SS public void cacheKey(DecoratedKey key, RowIndexEntry info) { - CachingOptions caching = metadata.getCaching(); + CachingParams caching = metadata.params.caching; - if (!caching.keyCache.isEnabled() - || keyCache == null - || keyCache.getCapacity() == 0) - { + if (!caching.cacheKeys() || keyCache == null || keyCache.getCapacity() == 0) return; - } KeyCacheKey cacheKey = new KeyCacheKey(metadata.cfId, descriptor, key.getKey()); logger.trace("Adding cache entry for {} -> {}", cacheKey, info); http://git-wip-us.apache.org/repos/asf/cassandra/blob/b31845c4/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java index fa691b8..1a3f796 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/format/SSTableWriter.java @@ -133,10 +133,10 @@ public abstract class SSTableWriter extends SSTable implements Transactional Component.TOC, Component.DIGEST)); - if (metadata.getBloomFilterFpChance() < 1.0) + if (metadata.params.bloomFilterFpChance < 1.0) components.add(Component.FILTER); - if (metadata.compressionParameters().isEnabled()) + if (metadata.params.compression.isEnabled()) { components.add(Component.COMPRESSION_INFO); } @@ -251,7 +251,7 @@ public abstract class SSTableWriter extends SSTable implements Transactional protected Map<MetadataType, MetadataComponent> finalizeMetadata() { return metadataCollector.finalizeMetadata(getPartitioner().getClass().getCanonicalName(), - metadata.getBloomFilterFpChance(), + metadata.params.bloomFilterFpChance, repairedAt, header); } http://git-wip-us.apache.org/repos/asf/cassandra/blob/b31845c4/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java index 5607a7e..2b60479 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java +++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableWriter.java @@ -71,7 +71,7 @@ public class BigTableWriter extends SSTableWriter { dataFile = SequentialWriter.open(getFilename(), descriptor.filenameFor(Component.COMPRESSION_INFO), - metadata.compressionParameters(), + metadata.params.compression, metadataCollector); dbuilder = SegmentedFile.getCompressedBuilder((CompressedSequentialWriter) dataFile); } @@ -376,8 +376,8 @@ public class BigTableWriter extends SSTableWriter { indexFile = SequentialWriter.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX))); builder = SegmentedFile.getBuilder(DatabaseDescriptor.getIndexAccessMode(), false); - summary = new IndexSummaryBuilder(keyCount, metadata.getMinIndexInterval(), Downsampling.BASE_SAMPLING_LEVEL); - bf = FilterFactory.getFilter(keyCount, metadata.getBloomFilterFpChance(), true, descriptor.version.hasOldBfHashOrder()); + summary = new IndexSummaryBuilder(keyCount, metadata.params.minIndexInterval, Downsampling.BASE_SAMPLING_LEVEL); + bf = FilterFactory.getFilter(keyCount, metadata.params.bloomFilterFpChance, true, descriptor.version.hasOldBfHashOrder()); // register listeners to be alerted when the data files are flushed indexFile.setPostFlushListener(new Runnable() { http://git-wip-us.apache.org/repos/asf/cassandra/blob/b31845c4/src/java/org/apache/cassandra/io/util/SequentialWriter.java ---------------------------------------------------------------------- diff --git a/src/java/org/apache/cassandra/io/util/SequentialWriter.java b/src/java/org/apache/cassandra/io/util/SequentialWriter.java index 90340ca..ddabe89 100644 --- a/src/java/org/apache/cassandra/io/util/SequentialWriter.java +++ b/src/java/org/apache/cassandra/io/util/SequentialWriter.java @@ -29,7 +29,7 @@ import org.apache.cassandra.io.FSReadError; import org.apache.cassandra.io.FSWriteError; import org.apache.cassandra.io.compress.BufferType; import org.apache.cassandra.io.compress.CompressedSequentialWriter; -import org.apache.cassandra.io.compress.CompressionParameters; +import org.apache.cassandra.schema.CompressionParams; import org.apache.cassandra.io.sstable.Descriptor; import org.apache.cassandra.io.sstable.metadata.MetadataCollector; import org.apache.cassandra.utils.CLibrary; @@ -168,7 +168,7 @@ public class SequentialWriter extends OutputStream implements WritableByteChanne public static CompressedSequentialWriter open(String dataFilePath, String offsetsPath, - CompressionParameters parameters, + CompressionParams parameters, MetadataCollector sstableMetadataCollector) { return new CompressedSequentialWriter(new File(dataFilePath), offsetsPath, parameters, sstableMetadataCollector);
