This is an automated email from the ASF dual-hosted git repository.
mdedetrich pushed a commit to branch 1.3.x
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/1.3.x by this push:
new 4c1c337178 Make automatic flushing of compression configurable
4c1c337178 is described below
commit 4c1c337178578d8c3327b2b226f40d0bc704e5f3
Author: Matthew de Detrich <[email protected]>
AuthorDate: Sun Nov 9 18:14:11 2025 +0100
Make automatic flushing of compression configurable
(cherry picked from commit 274d704d6bb918267da846dc97cf43799818cd04)
---
.../pekko/stream/io/compression/CoderSpec.scala | 3 ++
.../io/compression/DeflateAutoFlushSpec.scala | 29 ++++++++++++++
.../stream/io/compression/GzipAutoFlushSpec.scala | 29 ++++++++++++++
.../impl/io/compression/CompressionUtils.scala | 10 ++++-
.../apache/pekko/stream/javadsl/Compression.scala | 39 +++++++++++++++----
.../apache/pekko/stream/scaladsl/Compression.scala | 45 +++++++++++++++-------
6 files changed, 132 insertions(+), 23 deletions(-)
diff --git
a/stream-tests/src/test/scala/org/apache/pekko/stream/io/compression/CoderSpec.scala
b/stream-tests/src/test/scala/org/apache/pekko/stream/io/compression/CoderSpec.scala
index bed6fef7d9..af7191b0d7 100644
---
a/stream-tests/src/test/scala/org/apache/pekko/stream/io/compression/CoderSpec.scala
+++
b/stream-tests/src/test/scala/org/apache/pekko/stream/io/compression/CoderSpec.scala
@@ -45,6 +45,8 @@ abstract class CoderSpec(codecName: String) extends
AnyWordSpec with CodecSpecSu
case object AllDataAllowed extends Exception with NoStackTrace
protected def corruptInputCheck: Boolean = true
+ protected def autoFlush: Boolean = true
+
def extraTests(): Unit = {}
s"The $codecName codec" should {
@@ -145,6 +147,7 @@ abstract class CoderSpec(codecName: String) extends
AnyWordSpec with CodecSpecSu
}
"be able to decode chunk-by-chunk (depending on input chunks)" in {
+ assume(autoFlush)
val minLength = 100
val maxLength = 1000
val numElements = 1000
diff --git
a/stream-tests/src/test/scala/org/apache/pekko/stream/io/compression/DeflateAutoFlushSpec.scala
b/stream-tests/src/test/scala/org/apache/pekko/stream/io/compression/DeflateAutoFlushSpec.scala
new file mode 100644
index 0000000000..284aaa3ae1
--- /dev/null
+++
b/stream-tests/src/test/scala/org/apache/pekko/stream/io/compression/DeflateAutoFlushSpec.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.io.compression
+
+import org.apache.pekko.stream.scaladsl.{ Compression, Flow }
+import org.apache.pekko.util.ByteString
+
+import java.util.zip.Deflater
+
+class DeflateAutoFlushSpec extends DeflateSpec {
+ override protected val encoderFlow: Flow[ByteString, ByteString, Any] =
+ Compression.deflate(Deflater.BEST_COMPRESSION, nowrap = false, autoFlush =
false)
+ override protected val autoFlush: Boolean = false
+}
diff --git
a/stream-tests/src/test/scala/org/apache/pekko/stream/io/compression/GzipAutoFlushSpec.scala
b/stream-tests/src/test/scala/org/apache/pekko/stream/io/compression/GzipAutoFlushSpec.scala
new file mode 100644
index 0000000000..031c19428b
--- /dev/null
+++
b/stream-tests/src/test/scala/org/apache/pekko/stream/io/compression/GzipAutoFlushSpec.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.stream.io.compression
+
+import org.apache.pekko.stream.scaladsl.{ Compression, Flow }
+import org.apache.pekko.util.ByteString
+
+import java.util.zip.Deflater
+
+class GzipAutoFlushSpec extends GzipSpec {
+ override protected val encoderFlow: Flow[ByteString, ByteString, Any] =
+ Compression.gzip(Deflater.BEST_COMPRESSION, autoFlush = false)
+ override protected val autoFlush: Boolean = false
+}
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/impl/io/compression/CompressionUtils.scala
b/stream/src/main/scala/org/apache/pekko/stream/impl/io/compression/CompressionUtils.scala
index 7ce44d962c..d6150f48f3 100644
---
a/stream/src/main/scala/org/apache/pekko/stream/impl/io/compression/CompressionUtils.scala
+++
b/stream/src/main/scala/org/apache/pekko/stream/impl/io/compression/CompressionUtils.scala
@@ -28,7 +28,8 @@ import pekko.util.ByteString
/**
* Creates a flow from a compressor constructor.
*/
- def compressorFlow(newCompressor: () => Compressor): Flow[ByteString,
ByteString, NotUsed] =
+ def compressorFlow(newCompressor: () => Compressor, autoFlush: Boolean =
true)
+ : Flow[ByteString, ByteString, NotUsed] =
Flow.fromGraph {
new SimpleLinearGraphStage[ByteString] {
override def createLogic(inheritedAttributes: Attributes):
GraphStageLogic =
@@ -36,7 +37,12 @@ import pekko.util.ByteString
val compressor = newCompressor()
override def onPush(): Unit = {
- val data = compressor.compressAndFlush(grab(in))
+ val grabbed = grab(in)
+ val data = if (autoFlush)
+ compressor.compressAndFlush(grabbed)
+ else
+ compressor.compress(grabbed)
+
if (data.nonEmpty) push(out, data)
else pull(in)
}
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Compression.scala
b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Compression.scala
index 5f82fa68fa..f2ae5cb5c8 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Compression.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Compression.scala
@@ -56,10 +56,10 @@ object Compression {
scaladsl.Compression.inflate(maxBytesPerChunk, nowrap).asJava
/**
- * Creates a flow that gzip-compresses a stream of ByteStrings. Note that
the compressor
- * will SYNC_FLUSH after every [[pekko.util.ByteString]] so that it is
guaranteed that every [[pekko.util.ByteString]]
- * coming out of the flow can be fully decompressed without waiting for
additional data. This may
- * come at a compression performance cost for very small chunks.
+ * Creates a flow that gzip-compresses a stream of ByteStrings. Note that
the compressor will
+ * flush after every single element in stream so that it is guaranteed that
every [[pekko.util.ByteString]]
+ * coming out of the flow can be fully decompressed without waiting for
additional data. This may come at
+ * a compression performance cost for very small chunks.
*/
def gzip: Flow[ByteString, ByteString, NotUsed] =
scaladsl.Compression.gzip.asJava
@@ -73,10 +73,21 @@ object Compression {
scaladsl.Compression.gzip(level).asJava
/**
- * Creates a flow that deflate-compresses a stream of ByteString. Note that
the compressor
- * will SYNC_FLUSH after every [[pekko.util.ByteString]] so that it is
guaranteed that every [[pekko.util.ByteString]]
- * coming out of the flow can be fully decompressed without waiting for
additional data. This may
- * come at a compression performance cost for very small chunks.
+ * Same as [[gzip]] with a custom level and configurable flush mode.
+ *
+ * @param level Compression level (0-9)
+ * @param autoFlush If true will automatically flush after every single
element in the stream.
+ *
+ * @since 1.3.0
+ */
+ def gzip(level: Int, autoFlush: Boolean): Flow[ByteString, ByteString,
NotUsed] =
+ scaladsl.Compression.gzip(level, autoFlush).asJava
+
+ /**
+ * Creates a flow that deflate-compresses a stream of ByteString. Note that
the compressor will
+ * flush after every single element in stream so that it is guaranteed that
every [[pekko.util.ByteString]]
+ * coming out of the flow can be fully decompressed without waiting for
additional data. This may come at
+ * a compression performance cost for very small chunks.
*/
def deflate: Flow[ByteString, ByteString, NotUsed] =
scaladsl.Compression.deflate.asJava
@@ -90,4 +101,16 @@ object Compression {
def deflate(level: Int, nowrap: Boolean): Flow[ByteString, ByteString,
NotUsed] =
scaladsl.Compression.deflate(level, nowrap).asJava
+ /**
+ * Same as [[deflate]] with configurable level, nowrap and autoFlush.
+ *
+ * @param level Compression level (0-9)
+ * @param nowrap if true then use GZIP compatible compression
+ * @param autoFlush If true will automatically flush after every single
element in the stream.
+ *
+ * @since 1.3.0
+ */
+ def deflate(level: Int, nowrap: Boolean, autoFlush: Boolean):
Flow[ByteString, ByteString, NotUsed] =
+ scaladsl.Compression.deflate(level, nowrap, autoFlush).asJava
+
}
diff --git
a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Compression.scala
b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Compression.scala
index 19f1ae3e79..56b77034b5 100644
--- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Compression.scala
+++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Compression.scala
@@ -24,12 +24,10 @@ object Compression {
final val MaxBytesPerChunkDefault = 64 * 1024
/**
- * Creates a flow that gzip-compresses a stream of ByteStrings. Note that
the compressor
- * will SYNC_FLUSH after every [[pekko.util.ByteString]] so that it is
guaranteed that every [[pekko.util.ByteString]]
- * coming out of the flow can be fully decompressed without waiting for
additional data. This may
- * come at a compression performance cost for very small chunks.
- *
- * FIXME: should strategy / flush mode be configurable? See
https://github.com/akka/akka/issues/21849
+ * Creates a flow that gzip-compresses a stream of ByteStrings. Note that
the compressor will
+ * flush after every single element in stream so that it is guaranteed that
every [[pekko.util.ByteString]]
+ * coming out of the flow can be fully decompressed without waiting for
additional data. This may come at
+ * a compression performance cost for very small chunks.
*/
def gzip: Flow[ByteString, ByteString, NotUsed] =
gzip(Deflater.BEST_COMPRESSION)
@@ -41,6 +39,17 @@ object Compression {
def gzip(level: Int): Flow[ByteString, ByteString, NotUsed] =
CompressionUtils.compressorFlow(() => new GzipCompressor(level))
+ /**
+ * Same as [[gzip]] with a custom level and configurable flush mode.
+ *
+ * @param level Compression level (0-9)
+ * @param autoFlush If true will automatically flush after every single
element in the stream.
+ *
+ * @since 1.3.0
+ */
+ def gzip(level: Int, autoFlush: Boolean): Flow[ByteString, ByteString,
NotUsed] =
+ CompressionUtils.compressorFlow(() => new GzipCompressor(level), autoFlush)
+
/**
* Creates a Flow that decompresses a gzip-compressed stream of data.
*
@@ -60,14 +69,12 @@ object Compression {
Flow[ByteString].via(new
GzipDecompressor(maxBytesPerChunk)).named("gunzip")
/**
- * Creates a flow that deflate-compresses a stream of ByteString. Note that
the compressor
- * will SYNC_FLUSH after every [[pekko.util.ByteString]] so that it is
guaranteed that every [[pekko.util.ByteString]]
- * coming out of the flow can be fully decompressed without waiting for
additional data. This may
- * come at a compression performance cost for very small chunks.
- *
- * FIXME: should strategy / flush mode be configurable? See
https://github.com/akka/akka/issues/21849
+ * Creates a flow that deflate-compresses a stream of ByteString. Note that
the compressor will
+ * flush after every single element in stream so that it is guaranteed that
every [[pekko.util.ByteString]]
+ * coming out of the flow can be fully decompressed without waiting for
additional data. This may come at
+ * a compression performance cost for very small chunks.
*/
- def deflate: Flow[ByteString, ByteString, NotUsed] =
deflate(Deflater.BEST_COMPRESSION, false)
+ def deflate: Flow[ByteString, ByteString, NotUsed] =
deflate(Deflater.BEST_COMPRESSION, nowrap = false)
/**
* Same as [[deflate]] with configurable level and nowrap
@@ -78,6 +85,18 @@ object Compression {
def deflate(level: Int, nowrap: Boolean): Flow[ByteString, ByteString,
NotUsed] =
CompressionUtils.compressorFlow(() => new DeflateCompressor(level, nowrap))
+ /**
+ * Same as [[deflate]] with configurable level, nowrap and autoFlush.
+ *
+ * @param level Compression level (0-9)
+ * @param nowrap if true then use GZIP compatible compression
+ * @param autoFlush If true will automatically flush after every single
element in the stream.
+ *
+ * @since 1.3.0
+ */
+ def deflate(level: Int, nowrap: Boolean, autoFlush: Boolean):
Flow[ByteString, ByteString, NotUsed] =
+ CompressionUtils.compressorFlow(() => new DeflateCompressor(level,
nowrap), autoFlush)
+
/**
* Creates a Flow that decompresses a deflate-compressed stream of data.
*
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]