This is an automated email from the ASF dual-hosted git repository.
jolshan pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b99be961b8a KAFKA-18206: EmbeddedKafkaCluster must set features
(#18189)
b99be961b8a is described below
commit b99be961b8a05e75b7225280c9592574423cbfe1
Author: Kuan-Po Tseng <[email protected]>
AuthorDate: Thu Feb 6 01:14:36 2025 +0800
KAFKA-18206: EmbeddedKafkaCluster must set features (#18189)
related to KAFKA-18206, set features in EmbeddedKafkaCluster in both
streams and connect module, note that this PR also fix potential transaction
with empty records in sendPrivileged method as transaction version 2 doesn't
allow this kind of scenario.
Reviewers: Justine Olshan <[email protected]>
---
.../connect/integration/ExactlyOnceSourceIntegrationTest.java | 4 ++--
.../scala/integration/kafka/server/KRaftClusterTest.scala | 3 ++-
.../main/java/org/apache/kafka/common/test/TestKitNodes.java | 11 ++++++++++-
3 files changed, 14 insertions(+), 4 deletions(-)
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java
index 6bb12e8f178..494af3358e0 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java
@@ -30,7 +30,7 @@ import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
-import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.errors.InvalidProducerEpochException;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourceType;
@@ -1225,7 +1225,7 @@ public class ExactlyOnceSourceIntegrationTest {
private void assertTransactionalProducerIsFenced(KafkaProducer<byte[],
byte[]> producer, String topic) {
producer.beginTransaction();
assertThrows(
- ProducerFencedException.class,
+ InvalidProducerEpochException.class,
() -> {
producer.send(new ProducerRecord<>(topic, new byte[] {69},
new byte[] {96}));
producer.commitTransaction();
diff --git
a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
index 44bfa9f1734..de9ea1a72c0 100644
--- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
+++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
@@ -696,7 +696,8 @@ class KRaftClusterTest {
new AlterConfigOp(new ConfigEntry("max.connections.per.ip", "60"),
OpType.SET))))))
validateConfigs(admin, Map(new ConfigResource(Type.BROKER, "") -> Seq(
("log.roll.ms", "1234567"),
- ("max.connections.per.ip", "60"))), exhaustive = true)
+ ("max.connections.per.ip", "60"),
+ ("min.insync.replicas", "1"))), exhaustive = true)
admin.createTopics(util.Arrays.asList(
new NewTopic("foo", 2, 3.toShort),
diff --git
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestKitNodes.java
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestKitNodes.java
index c78462c213a..d608972e1d7 100644
---
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestKitNodes.java
+++
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestKitNodes.java
@@ -25,6 +25,7 @@ import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
import org.apache.kafka.metadata.properties.MetaProperties;
import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble;
import org.apache.kafka.metadata.properties.MetaPropertiesVersion;
+import org.apache.kafka.server.common.Feature;
import org.apache.kafka.server.common.MetadataVersion;
import java.io.File;
@@ -55,7 +56,15 @@ public class TestKitNodes {
private BootstrapMetadata bootstrapMetadata;
public Builder() {
-
this(BootstrapMetadata.fromVersion(MetadataVersion.latestTesting(), "testkit"));
+ this(BootstrapMetadata.fromVersions(
+ MetadataVersion.latestTesting(),
+ Feature.PRODUCTION_FEATURES.stream()
+ .collect(Collectors.toMap(
+ Feature::featureName,
+ feature ->
feature.defaultLevel(MetadataVersion.latestTesting()),
+ (existing, replacement) -> existing,
+ TreeMap::new)),
+ "testkit"));
}
public Builder(BootstrapMetadata bootstrapMetadata) {