This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.5 by this push:
     new b8b30bc  KAFKA-9700: Fix negative estimatedCompressionRatio (#8285)
b8b30bc is described below

commit b8b30bc3865c4d616b9bff04ef345c00a0ae62d1
Author: jiameixie <50685847+jiamei...@users.noreply.github.com>
AuthorDate: Wed Mar 25 11:48:50 2020 +0800

    KAFKA-9700: Fix negative estimatedCompressionRatio (#8285)
    
    There are cases where `currentEstimation` is less than
    `COMPRESSION_RATIO_IMPROVING_STEP` causing
    `estimatedCompressionRatio` to be negative. This, in turn,
    may result in `MESSAGE_TOO_LARGE`.
    
    Reviewers: Ismael Juma <ism...@juma.me.uk>
---
 .../common/record/CompressionRatioEstimator.java   |  4 +-
 .../record/CompressionRatioEstimatorTest.java      | 53 ++++++++++++++++++++++
 2 files changed, 55 insertions(+), 2 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/CompressionRatioEstimator.java
 
b/clients/src/main/java/org/apache/kafka/common/record/CompressionRatioEstimator.java
index 7f11784..264525b 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/record/CompressionRatioEstimator.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/record/CompressionRatioEstimator.java
@@ -46,7 +46,7 @@ public class CompressionRatioEstimator {
             if (observedRatio > currentEstimation)
                 compressionRatioForTopic[type.id] = Math.max(currentEstimation 
+ COMPRESSION_RATIO_DETERIORATE_STEP, observedRatio);
             else if (observedRatio < currentEstimation) {
-                compressionRatioForTopic[type.id] = currentEstimation - 
COMPRESSION_RATIO_IMPROVING_STEP;
+                compressionRatioForTopic[type.id] = Math.max(currentEstimation 
- COMPRESSION_RATIO_IMPROVING_STEP, observedRatio);
             }
         }
         return compressionRatioForTopic[type.id];
@@ -108,4 +108,4 @@ public class CompressionRatioEstimator {
         }
         return compressionRatio;
     }
-}
\ No newline at end of file
+}
diff --git 
a/clients/src/test/java/org/apache/kafka/common/record/CompressionRatioEstimatorTest.java
 
b/clients/src/test/java/org/apache/kafka/common/record/CompressionRatioEstimatorTest.java
new file mode 100644
index 0000000..4a2317c
--- /dev/null
+++ 
b/clients/src/test/java/org/apache/kafka/common/record/CompressionRatioEstimatorTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.record;
+
+import org.junit.Test;
+import static org.junit.Assert.assertTrue;
+import java.util.Arrays;
+import java.util.List;
+
+public class CompressionRatioEstimatorTest {
+
+    @Test
+    public void testUpdateEstimation() {
+        class EstimationsObservedRatios {
+            float currentEstimation;
+            float observedRatio;
+            EstimationsObservedRatios(float currentEstimation, float 
observedRatio) {
+                this.currentEstimation = currentEstimation;
+                this.observedRatio = observedRatio;
+            }
+        }
+
+        // If currentEstimation is smaller than observedRatio, the 
updatedCompressionRatio is currentEstimation plus
+        // COMPRESSION_RATIO_DETERIORATE_STEP 0.05, otherwise 
currentEstimation minus COMPRESSION_RATIO_IMPROVING_STEP
+        // 0.005. There are four cases,and updatedCompressionRatio shouldn't 
smaller than observedRatio in all of cases.
+        // Refer to non test code for more details.
+        List<EstimationsObservedRatios> estimationsObservedRatios = 
Arrays.asList(
+            new EstimationsObservedRatios(0.8f, 0.84f),
+            new EstimationsObservedRatios(0.6f, 0.7f),
+            new EstimationsObservedRatios(0.6f, 0.4f),
+            new EstimationsObservedRatios(0.004f, 0.001f));
+        for (EstimationsObservedRatios estimationsObservedRatio : 
estimationsObservedRatios) {
+            String topic = "tp";
+            CompressionRatioEstimator.setEstimation(topic, 
CompressionType.ZSTD, estimationsObservedRatio.currentEstimation);
+            float updatedCompressionRatio = 
CompressionRatioEstimator.updateEstimation(topic, CompressionType.ZSTD, 
estimationsObservedRatio.observedRatio);
+            assertTrue(updatedCompressionRatio >= 
estimationsObservedRatio.observedRatio);
+        }
+    }
+}

Reply via email to