This is an automated email from the ASF dual-hosted git repository.

jolshan pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b99be961b8a KAFKA-18206: EmbeddedKafkaCluster must set features 
(#18189)
b99be961b8a is described below

commit b99be961b8a05e75b7225280c9592574423cbfe1
Author: Kuan-Po Tseng <[email protected]>
AuthorDate: Thu Feb 6 01:14:36 2025 +0800

    KAFKA-18206: EmbeddedKafkaCluster must set features (#18189)
    
    related to KAFKA-18206, set features in EmbeddedKafkaCluster in both 
streams and connect module, note that this PR also fix potential transaction 
with empty records in sendPrivileged method as transaction version 2 doesn't 
allow this kind of scenario.
    
    Reviewers: Justine Olshan <[email protected]>
---
 .../connect/integration/ExactlyOnceSourceIntegrationTest.java |  4 ++--
 .../scala/integration/kafka/server/KRaftClusterTest.scala     |  3 ++-
 .../main/java/org/apache/kafka/common/test/TestKitNodes.java  | 11 ++++++++++-
 3 files changed, 14 insertions(+), 4 deletions(-)

diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java
index 6bb12e8f178..494af3358e0 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java
@@ -30,7 +30,7 @@ import org.apache.kafka.common.acl.AclOperation;
 import org.apache.kafka.common.acl.AclPermissionType;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
-import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.errors.InvalidProducerEpochException;
 import org.apache.kafka.common.resource.PatternType;
 import org.apache.kafka.common.resource.ResourcePattern;
 import org.apache.kafka.common.resource.ResourceType;
@@ -1225,7 +1225,7 @@ public class ExactlyOnceSourceIntegrationTest {
     private void assertTransactionalProducerIsFenced(KafkaProducer<byte[], 
byte[]> producer, String topic) {
         producer.beginTransaction();
         assertThrows(
-                ProducerFencedException.class,
+                InvalidProducerEpochException.class,
                 () -> {
                     producer.send(new ProducerRecord<>(topic, new byte[] {69}, 
new byte[] {96}));
                     producer.commitTransaction();
diff --git 
a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala 
b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
index 44bfa9f1734..de9ea1a72c0 100644
--- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
+++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
@@ -696,7 +696,8 @@ class KRaftClusterTest {
             new AlterConfigOp(new ConfigEntry("max.connections.per.ip", "60"), 
OpType.SET))))))
         validateConfigs(admin, Map(new ConfigResource(Type.BROKER, "") -> Seq(
           ("log.roll.ms", "1234567"),
-          ("max.connections.per.ip", "60"))), exhaustive = true)
+          ("max.connections.per.ip", "60"),
+          ("min.insync.replicas", "1"))), exhaustive = true)
 
         admin.createTopics(util.Arrays.asList(
           new NewTopic("foo", 2, 3.toShort),
diff --git 
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestKitNodes.java
 
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestKitNodes.java
index c78462c213a..d608972e1d7 100644
--- 
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestKitNodes.java
+++ 
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestKitNodes.java
@@ -25,6 +25,7 @@ import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
 import org.apache.kafka.metadata.properties.MetaProperties;
 import org.apache.kafka.metadata.properties.MetaPropertiesEnsemble;
 import org.apache.kafka.metadata.properties.MetaPropertiesVersion;
+import org.apache.kafka.server.common.Feature;
 import org.apache.kafka.server.common.MetadataVersion;
 
 import java.io.File;
@@ -55,7 +56,15 @@ public class TestKitNodes {
         private BootstrapMetadata bootstrapMetadata;
 
         public Builder() {
-            
this(BootstrapMetadata.fromVersion(MetadataVersion.latestTesting(), "testkit"));
+            this(BootstrapMetadata.fromVersions(
+                    MetadataVersion.latestTesting(),
+                    Feature.PRODUCTION_FEATURES.stream()
+                            .collect(Collectors.toMap(
+                                    Feature::featureName,
+                                    feature -> 
feature.defaultLevel(MetadataVersion.latestTesting()),
+                                    (existing, replacement) -> existing,
+                                    TreeMap::new)),
+                    "testkit"));
         }
 
         public Builder(BootstrapMetadata bootstrapMetadata) {

Reply via email to