This is an automated email from the ASF dual-hosted git repository.
nickpan47 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new 9d9ebc7 SAMZA-398: Remove force NONE compression for changelog topic
producer
new d5f5494 Merge pull request #1492 from
perkss/SAMZA-398-allow-compressed-changelog
9d9ebc7 is described below
commit 9d9ebc70d127c9deca35e09d1056835ad4f73983
Author: perkss <[email protected]>
AuthorDate: Tue Apr 13 21:30:15 2021 +0100
SAMZA-398: Remove force NONE compression for changelog topic producer
---
.../src/main/java/org/apache/samza/config/StorageConfig.java | 12 ------------
.../test/java/org/apache/samza/config/TestStorageConfig.java | 12 ------------
.../src/main/scala/org/apache/samza/config/KafkaConfig.scala | 4 +---
.../org/apache/samza/system/kafka/KafkaSystemFactory.scala | 11 +----------
.../apache/samza/system/kafka/TestKafkaSystemFactory.scala | 12 ------------
.../framework/StreamApplicationIntegrationTestHarness.java | 1 +
6 files changed, 3 insertions(+), 49 deletions(-)
diff --git
a/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
b/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
index a8b8702..548cc27 100644
--- a/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
+++ b/samza-core/src/main/java/org/apache/samza/config/StorageConfig.java
@@ -29,7 +29,6 @@ import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import org.apache.samza.SamzaException;
import org.apache.samza.execution.StreamManager;
-import org.apache.samza.util.StreamUtil;
import static com.google.common.base.Preconditions.*;
@@ -241,17 +240,6 @@ public class StorageConfig extends MapConfig {
}
/**
- * Helper method to check if a system has a changelog attached to it.
- */
- public boolean isChangelogSystem(String systemName) {
- return getStoreNames().stream()
- .map(this::getChangelogStream)
- .filter(Optional::isPresent)
- .map(systemStreamName ->
StreamUtil.getSystemStreamFromNames(systemStreamName.get()).getSystem())
- .anyMatch(system -> system.equals(systemName));
- }
-
- /**
* Helper method to check if there is any stores configured w/ a changelog
*/
public boolean hasDurableStores() {
diff --git
a/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java
b/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java
index 88fbbe0..ec9b263 100644
--- a/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java
+++ b/samza-core/src/test/java/org/apache/samza/config/TestStorageConfig.java
@@ -258,18 +258,6 @@ public class TestStorageConfig {
}
@Test
- public void testIsChangelogSystem() {
- StorageConfig storageConfig = new StorageConfig(new
MapConfig(ImmutableMap.of(
- // store0 has a changelog stream
- String.format(StorageConfig.FACTORY, STORE_NAME0), "factory.class",
- String.format(CHANGELOG_STREAM, STORE_NAME0),
"system0.changelog-stream",
- // store1 does not have a changelog stream
- String.format(StorageConfig.FACTORY, STORE_NAME1), "factory.class")));
- assertTrue(storageConfig.isChangelogSystem("system0"));
- assertFalse(storageConfig.isChangelogSystem("other-system"));
- }
-
- @Test
public void testHasDurableStores() {
// no changelog, which means no durable stores
StorageConfig storageConfig = new StorageConfig(
diff --git
a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
index 69a9966..391536a 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
@@ -381,14 +381,12 @@ class KafkaConfig(config: Config) extends
ScalaMapConfig(config) {
}
def getKafkaSystemProducerConfig( systemName: String,
- clientId: String,
- injectedProps: Map[String, String] =
Map()) = {
+ clientId: String) = {
val subConf = config.subset("systems.%s.producer." format systemName, true)
val producerProps = new util.HashMap[String, String]()
producerProps.putAll(subConf)
producerProps.put("client.id", clientId)
- producerProps.putAll(injectedProps.asJava)
new KafkaProducerConfig(systemName, clientId, producerProps)
}
}
diff --git
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
index 8d1fd6b..a2773f6 100644
---
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
+++
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemFactory.scala
@@ -32,14 +32,6 @@ import scala.collection.JavaConverters._
import org.apache.samza.util._
object KafkaSystemFactory extends Logging {
- @VisibleForTesting
- def getInjectedProducerProperties(systemName: String, config: Config) = if
(new StorageConfig(config).isChangelogSystem(systemName)) {
- warn("System name '%s' is being used as a changelog. Disabling compression
since Kafka does not support compression for log compacted topics." format
systemName)
- Map[String, String]("compression.type" -> "none")
- } else {
- Map[String, String]()
- }
-
val CLIENTID_PRODUCER_PREFIX = "kafka-producer"
val CLIENTID_CONSUMER_PREFIX = "kafka-consumer"
val CLIENTID_ADMIN_PREFIX = "kafka-admin-consumer"
@@ -67,9 +59,8 @@ class KafkaSystemFactory extends SystemFactory with Logging {
}
def getProducer(systemName: String, config: Config, registry:
MetricsRegistry): SystemProducer = {
- val injectedProps =
KafkaSystemFactory.getInjectedProducerProperties(systemName, config)
val clientId =
KafkaConsumerConfig.createClientId(KafkaSystemFactory.CLIENTID_PRODUCER_PREFIX,
config);
- val producerConfig = config.getKafkaSystemProducerConfig(systemName,
clientId, injectedProps)
+ val producerConfig = config.getKafkaSystemProducerConfig(systemName,
clientId)
val getProducer = () => {
new KafkaProducer[Array[Byte],
Array[Byte]](producerConfig.getProducerProperties)
}
diff --git
a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemFactory.scala
b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemFactory.scala
index 596d67b..ecbc00d 100644
---
a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemFactory.scala
+++
b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemFactory.scala
@@ -82,16 +82,4 @@ class TestKafkaSystemFactory {
assertNotNull(producer)
assertTrue(producer.isInstanceOf[KafkaSystemProducer])
}
-
- @Test
- def testInjectedProducerProps {
- val configMap = Map[String, String](
- StorageConfig.FACTORY.format("system1") -> "some.factory.Class",
- StorageConfig.CHANGELOG_STREAM.format("system1") -> "system1.stream1",
- StorageConfig.FACTORY.format("system2") -> "some.factory.Class")
- val config = new MapConfig(configMap.asJava)
- assertEquals(Map[String, String](),
KafkaSystemFactory.getInjectedProducerProperties("system3", config))
- assertEquals(Map[String, String](),
KafkaSystemFactory.getInjectedProducerProperties("system2", config))
- assertEquals(Map[String, String]("compression.type" -> "none"),
KafkaSystemFactory.getInjectedProducerProperties("system1", config))
- }
}
diff --git
a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java
b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java
index 0d1e49a..012eece 100644
---
a/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java
+++
b/samza-test/src/test/java/org/apache/samza/test/framework/StreamApplicationIntegrationTestHarness.java
@@ -188,6 +188,7 @@ public class StreamApplicationIntegrationTestHarness
extends IntegrationTestHarn
configMap.put("systems.kafka.samza.key.serde", "string");
configMap.put("systems.kafka.samza.msg.serde", "string");
configMap.put("systems.kafka.samza.offset.default", "oldest");
+ configMap.put("systems.kafka.producer.compression.type", "snappy");
configMap.put("job.coordinator.system", "kafka");
configMap.put("job.default.system", "kafka");
configMap.put("job.coordinator.replication.factor", "1");