This is an automated email from the ASF dual-hosted git repository.

clohfink pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4782fd3  Avoid failing compactions with very large partitions
4782fd3 is described below

commit 4782fd399d97be551032576c4d77f079e862fdba
Author: Caleb Rackliffe <[email protected]>
AuthorDate: Wed Sep 23 16:28:47 2020 -0500

    Avoid failing compactions with very large partitions
    
    Patch by Caleb Rackliffe; Reviewed by Chris Lohfink and Benjamin Lerer for 
CASSANDRA-15164
---
 CHANGES.txt                                        |  1 +
 .../io/sstable/metadata/MetadataCollector.java     |  4 +--
 .../io/sstable/metadata/StatsMetadata.java         | 31 +++++++++++++++++++---
 .../apache/cassandra/utils/EstimatedHistogram.java | 31 +++++++++++++++++++---
 .../sstable/metadata/MetadataSerializerTest.java   | 27 +++++++++++++++++++
 .../cassandra/utils/EstimatedHistogramTest.java    | 12 +++++++++
 .../apache/cassandra/utils/SerializationsTest.java |  2 +-
 7 files changed, 99 insertions(+), 9 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 42b7aa7..1db3ebc 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.0-beta3
+ * Avoid failing compactions with very large partitions (CASSANDRA-15164)
  * Prevent NPE in StreamMessage in type lookup (CASSANDRA-16131)
  * Avoid invalid state transition exception during incremental repair 
(CASSANDRA-16067)
  * Allow zero padding in timestamp serialization (CASSANDRA-16105)
diff --git 
a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java 
b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
index 570ede7..65d534e 100755
--- a/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/MetadataCollector.java
@@ -53,8 +53,8 @@ public class MetadataCollector implements 
PartitionStatisticsCollector
 
     static EstimatedHistogram defaultCellPerPartitionCountHistogram()
     {
-        // EH of 114 can track a max value of 2395318855, i.e., > 2B columns
-        return new EstimatedHistogram(114);
+        // EH of 118 can track a max value of 4139110981, i.e., > 4B cells
+        return new EstimatedHistogram(118);
     }
 
     static EstimatedHistogram defaultPartitionSizeHistogram()
diff --git 
a/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java 
b/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
index f4e5beb..e985f40 100755
--- a/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
+++ b/src/java/org/apache/cassandra/io/sstable/metadata/StatsMetadata.java
@@ -23,14 +23,17 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
 
-import org.apache.cassandra.db.rows.EncodingStats;
-import org.apache.cassandra.io.ISerializer;
-import org.apache.cassandra.io.sstable.format.Version;
 import org.apache.commons.lang3.builder.EqualsBuilder;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.db.rows.EncodingStats;
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.db.commitlog.CommitLogPosition;
 import org.apache.cassandra.db.commitlog.IntervalSet;
+import org.apache.cassandra.io.ISerializer;
+import org.apache.cassandra.io.sstable.format.Version;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.utils.ByteBufferUtil;
@@ -248,6 +251,8 @@ public class StatsMetadata extends MetadataComponent
 
     public static class StatsMetadataSerializer implements 
IMetadataComponentSerializer<StatsMetadata>
     {
+        private static final Logger logger = 
LoggerFactory.getLogger(StatsMetadataSerializer.class);
+
         public int serializedSize(Version version, StatsMetadata component) 
throws IOException
         {
             int size = 0;
@@ -340,7 +345,27 @@ public class StatsMetadata extends MetadataComponent
         public StatsMetadata deserialize(Version version, DataInputPlus in) 
throws IOException
         {
             EstimatedHistogram partitionSizes = 
EstimatedHistogram.serializer.deserialize(in);
+
+            if (partitionSizes.isOverflowed())
+            {
+                logger.warn("Deserialized partition size histogram with {} 
values greater than the maximum of {}. " +
+                            "Clearing the overflow bucket to allow for 
degraded mean and percentile calculations...",
+                            partitionSizes.overflowCount(), 
partitionSizes.getLargestBucketOffset());
+
+                partitionSizes.clearOverflow();
+            }
+
             EstimatedHistogram columnCounts = 
EstimatedHistogram.serializer.deserialize(in);
+
+            if (columnCounts.isOverflowed())
+            {
+                logger.warn("Deserialized partition cell count histogram with 
{} values greater than the maximum of {}. " +
+                            "Clearing the overflow bucket to allow for 
degraded mean and percentile calculations...",
+                            columnCounts.overflowCount(), 
columnCounts.getLargestBucketOffset());
+
+                columnCounts.clearOverflow();
+            }
+
             CommitLogPosition commitLogLowerBound = CommitLogPosition.NONE, 
commitLogUpperBound;
             commitLogUpperBound = CommitLogPosition.serializer.deserialize(in);
             long minTimestamp = in.readLong();
diff --git a/src/java/org/apache/cassandra/utils/EstimatedHistogram.java 
b/src/java/org/apache/cassandra/utils/EstimatedHistogram.java
index f25cc1e..a494b3a 100644
--- a/src/java/org/apache/cassandra/utils/EstimatedHistogram.java
+++ b/src/java/org/apache/cassandra/utils/EstimatedHistogram.java
@@ -22,12 +22,13 @@ import java.util.Arrays;
 import java.util.concurrent.atomic.AtomicLongArray;
 
 import com.google.common.base.Objects;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.cassandra.db.TypeSizes;
 import org.apache.cassandra.io.ISerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
-import org.slf4j.Logger;
 
 public class EstimatedHistogram
 {
@@ -275,11 +276,27 @@ public class EstimatedHistogram
     }
 
     /**
-     * @return true if this histogram has overflowed -- that is, a value 
larger than our largest bucket could bound was added
+     * @return true if a value larger than our largest bucket offset has been 
recorded, and false otherwise
      */
     public boolean isOverflowed()
     {
-        return buckets.get(buckets.length() - 1) > 0;
+        return overflowCount() > 0;
+    }
+
+    /**
+     * @return the number of recorded values larger than the largest bucket 
offset
+     */
+    public long overflowCount()
+    {
+        return buckets.get(buckets.length() - 1);
+    }
+
+    /**
+     * Resets the count in the overflow bucket to zero. Subsequent calls to 
{@link #isOverflowed()} will return false.
+     */
+    public void clearOverflow()
+    {
+        buckets.set(buckets.length() - 1, 0);
     }
 
     /**
@@ -367,8 +384,16 @@ public class EstimatedHistogram
 
     public static class EstimatedHistogramSerializer implements 
ISerializer<EstimatedHistogram>
     {
+        private static final Logger logger = 
LoggerFactory.getLogger(EstimatedHistogramSerializer.class);
+
         public void serialize(EstimatedHistogram eh, DataOutputPlus out) 
throws IOException
         {
+            if (eh.isOverflowed())
+            {
+                logger.warn("Serializing a histogram with {} values greater 
than the maximum of {}...",
+                            eh.overflowCount(), eh.getLargestBucketOffset());
+            }
+
             long[] offsets = eh.getBucketOffsets();
             long[] buckets = eh.getBuckets(false);
             out.writeInt(buckets.length);
diff --git 
a/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
 
b/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
index f109d8f..0ab3be6 100644
--- 
a/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
+++ 
b/test/unit/org/apache/cassandra/io/sstable/metadata/MetadataSerializerTest.java
@@ -76,6 +76,33 @@ public class MetadataSerializerTest
         }
     }
 
+    @Test
+    public void testHistogramSterilization() throws IOException
+    {
+        Map<MetadataType, MetadataComponent> originalMetadata = 
constructMetadata();
+
+        // Modify the histograms to overflow:
+        StatsMetadata originalStats = (StatsMetadata) 
originalMetadata.get(MetadataType.STATS);
+        originalStats.estimatedCellPerPartitionCount.add(Long.MAX_VALUE);
+        originalStats.estimatedPartitionSize.add(Long.MAX_VALUE);
+        
assertTrue(originalStats.estimatedCellPerPartitionCount.isOverflowed());
+        assertTrue(originalStats.estimatedPartitionSize.isOverflowed());
+
+        // Serialize w/ overflowed histograms:
+        MetadataSerializer serializer = new MetadataSerializer();
+        File statsFile = serialize(originalMetadata, serializer, 
BigFormat.latestVersion);
+        Descriptor desc = new Descriptor(statsFile.getParentFile(), "", "", 0, 
SSTableFormat.Type.BIG);
+
+        try (RandomAccessReader in = RandomAccessReader.open(statsFile))
+        {
+            // Deserialie and verify that the two histograms have had their 
overflow buckets cleared:
+            Map<MetadataType, MetadataComponent> deserialized = 
serializer.deserialize(desc, in, EnumSet.allOf(MetadataType.class));
+            StatsMetadata deserializedStats = 
(StatsMetadata)deserialized.get(MetadataType.STATS);
+            
assertFalse(deserializedStats.estimatedCellPerPartitionCount.isOverflowed());
+            
assertFalse(deserializedStats.estimatedPartitionSize.isOverflowed());
+        }
+    }
+
     public File serialize(Map<MetadataType, MetadataComponent> metadata, 
MetadataSerializer serializer, Version version)
             throws IOException
     {
diff --git a/test/unit/org/apache/cassandra/utils/EstimatedHistogramTest.java 
b/test/unit/org/apache/cassandra/utils/EstimatedHistogramTest.java
index b3fbfb6..edc297a 100644
--- a/test/unit/org/apache/cassandra/utils/EstimatedHistogramTest.java
+++ b/test/unit/org/apache/cassandra/utils/EstimatedHistogramTest.java
@@ -21,6 +21,8 @@ package org.apache.cassandra.utils;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 
 public class EstimatedHistogramTest
@@ -167,4 +169,14 @@ public class EstimatedHistogramTest
             assertEquals(1, histogram.percentile(0.99));
         }
     }
+
+    @Test
+    public void testClearOverflow()
+    {
+        EstimatedHistogram histogram = new EstimatedHistogram(1);
+        histogram.add(100);
+        assertTrue(histogram.isOverflowed());
+        histogram.clearOverflow();
+        assertFalse(histogram.isOverflowed());
+    }
 }
diff --git a/test/unit/org/apache/cassandra/utils/SerializationsTest.java 
b/test/unit/org/apache/cassandra/utils/SerializationsTest.java
index f260428..6597f3b 100644
--- a/test/unit/org/apache/cassandra/utils/SerializationsTest.java
+++ b/test/unit/org/apache/cassandra/utils/SerializationsTest.java
@@ -159,7 +159,7 @@ public class SerializationsTest extends 
AbstractSerializationsTester
             offsets[i] = i;
             data[i] = 10 * i;
         }
-        data[offsets.length] = 100000;
+        data[offsets.length] = 100000; // write into the overflow bucket
         EstimatedHistogram hist2 = new EstimatedHistogram(offsets, data);
 
         try (DataOutputStreamPlus out = 
getOutput("utils.EstimatedHistogram.bin"))


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to