This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f586fa59d3f KAFKA-14671: Refactor PredicatedTransformation to not 
implement Transformation (#13184)
f586fa59d3f is described below

commit f586fa59d3f938e04bda4e8143ddb1c4310eaf78
Author: Greg Harris <greg.har...@aiven.io>
AuthorDate: Tue Feb 28 08:23:19 2023 -0800

    KAFKA-14671: Refactor PredicatedTransformation to not implement 
Transformation (#13184)
    
    Reviewers: Christo Lolov <christolo...@gmail.com>, Yash Mayya 
<yash.ma...@gmail.com>, Chris Egerton <chr...@aiven.io>
---
 .../kafka/connect/runtime/ConnectorConfig.java     | 28 +++++-----
 .../kafka/connect/runtime/TransformationChain.java | 27 +++++-----
 ...ransformation.java => TransformationStage.java} | 49 ++++++++----------
 .../org/apache/kafka/connect/runtime/Worker.java   |  6 +--
 .../rest/resources/ConnectorPluginsResource.java   |  9 +---
 .../kafka/connect/runtime/ConnectorConfigTest.java | 60 +++++++++++-----------
 .../connect/runtime/ErrorHandlingTaskTest.java     |  4 +-
 ...ationTest.java => TransformationStageTest.java} | 29 +++++++----
 .../resources/ConnectorPluginsResourceTest.java    |  3 +-
 .../kafka/connect/util/TopicCreationTest.java      | 16 +++---
 10 files changed, 113 insertions(+), 118 deletions(-)

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
index 485dda9b98b..40b7c0a1462 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
@@ -268,12 +268,14 @@ public class ConnectorConfig extends AbstractConfig {
     }
 
     /**
-     * Returns the initialized list of {@link Transformation} which are 
specified in {@link #TRANSFORMS_CONFIG}.
+     * Returns the initialized list of {@link TransformationStage} which apply 
the
+     * {@link Transformation transformations} and {@link Predicate predicates}
+     * as they are specified in the {@link #TRANSFORMS_CONFIG} and {@link 
#PREDICATES_CONFIG}
      */
-    public <R extends ConnectRecord<R>> List<Transformation<R>> 
transformations() {
+    public <R extends ConnectRecord<R>> List<TransformationStage<R>> 
transformationStages() {
         final List<String> transformAliases = getList(TRANSFORMS_CONFIG);
 
-        final List<Transformation<R>> transformations = new 
ArrayList<>(transformAliases.size());
+        final List<TransformationStage<R>> transformations = new 
ArrayList<>(transformAliases.size());
         for (String alias : transformAliases) {
             final String prefix = TRANSFORMS_CONFIG + "." + alias + ".";
 
@@ -281,17 +283,17 @@ public class ConnectorConfig extends AbstractConfig {
                 @SuppressWarnings("unchecked")
                 final Transformation<R> transformation = 
Utils.newInstance(getClass(prefix + "type"), Transformation.class);
                 Map<String, Object> configs = originalsWithPrefix(prefix);
-                Object predicateAlias = 
configs.remove(PredicatedTransformation.PREDICATE_CONFIG);
-                Object negate = 
configs.remove(PredicatedTransformation.NEGATE_CONFIG);
+                Object predicateAlias = 
configs.remove(TransformationStage.PREDICATE_CONFIG);
+                Object negate = 
configs.remove(TransformationStage.NEGATE_CONFIG);
                 transformation.configure(configs);
                 if (predicateAlias != null) {
                     String predicatePrefix = PREDICATES_PREFIX + 
predicateAlias + ".";
                     @SuppressWarnings("unchecked")
                     Predicate<R> predicate = 
Utils.newInstance(getClass(predicatePrefix + "type"), Predicate.class);
                     predicate.configure(originalsWithPrefix(predicatePrefix));
-                    transformations.add(new 
PredicatedTransformation<>(predicate, negate == null ? false : 
Boolean.parseBoolean(negate.toString()), transformation));
+                    transformations.add(new TransformationStage<>(predicate, 
negate == null ? false : Boolean.parseBoolean(negate.toString()), 
transformation));
                 } else {
-                    transformations.add(transformation);
+                    transformations.add(new 
TransformationStage<>(transformation));
                 }
             } catch (Exception e) {
                 throw new ConnectException(e);
@@ -321,9 +323,9 @@ public class ConnectorConfig extends AbstractConfig {
             protected ConfigDef initialConfigDef() {
                 // All Transformations get these config parameters implicitly
                 return super.initialConfigDef()
-                        .define(PredicatedTransformation.PREDICATE_CONFIG, 
Type.STRING, "", Importance.MEDIUM,
+                        .define(TransformationStage.PREDICATE_CONFIG, 
Type.STRING, null, Importance.MEDIUM,
                                 "The alias of a predicate used to determine 
whether to apply this transformation.")
-                        .define(PredicatedTransformation.NEGATE_CONFIG, 
Type.BOOLEAN, false, Importance.MEDIUM,
+                        .define(TransformationStage.NEGATE_CONFIG, 
Type.BOOLEAN, false, Importance.MEDIUM,
                                 "Whether the configured predicate should be 
negated.");
             }
 
@@ -332,8 +334,8 @@ public class ConnectorConfig extends AbstractConfig {
                 return super.configDefsForClass(typeConfig)
                     .filter(entry -> {
                         // The implicit parameters mask any from the 
transformer with the same name
-                        if 
(PredicatedTransformation.PREDICATE_CONFIG.equals(entry.getKey())
-                                || 
PredicatedTransformation.NEGATE_CONFIG.equals(entry.getKey())) {
+                        if 
(TransformationStage.PREDICATE_CONFIG.equals(entry.getKey())
+                                || 
TransformationStage.NEGATE_CONFIG.equals(entry.getKey())) {
                             log.warn("Transformer config {} is masked by 
implicit config of that name",
                                     entry.getKey());
                             return false;
@@ -350,8 +352,8 @@ public class ConnectorConfig extends AbstractConfig {
 
             @Override
             protected void validateProps(String prefix) {
-                String prefixedNegate = prefix + 
PredicatedTransformation.NEGATE_CONFIG;
-                String prefixedPredicate = prefix + 
PredicatedTransformation.PREDICATE_CONFIG;
+                String prefixedNegate = prefix + 
TransformationStage.NEGATE_CONFIG;
+                String prefixedPredicate = prefix + 
TransformationStage.PREDICATE_CONFIG;
                 if (props.containsKey(prefixedNegate) &&
                         !props.containsKey(prefixedPredicate)) {
                     throw new ConfigException("Config '" + prefixedNegate + "' 
was provided " +
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationChain.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationChain.java
index 984c1422572..b130a226f5f 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationChain.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationChain.java
@@ -19,7 +19,6 @@ package org.apache.kafka.connect.runtime;
 import org.apache.kafka.connect.connector.ConnectRecord;
 import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
 import org.apache.kafka.connect.runtime.errors.Stage;
-import org.apache.kafka.connect.transforms.Transformation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -34,24 +33,24 @@ import java.util.StringJoiner;
 public class TransformationChain<R extends ConnectRecord<R>> implements 
AutoCloseable {
     private static final Logger log = 
LoggerFactory.getLogger(TransformationChain.class);
 
-    private final List<Transformation<R>> transformations;
+    private final List<TransformationStage<R>> transformationStages;
     private final RetryWithToleranceOperator retryWithToleranceOperator;
 
-    public TransformationChain(List<Transformation<R>> transformations, 
RetryWithToleranceOperator retryWithToleranceOperator) {
-        this.transformations = transformations;
+    public TransformationChain(List<TransformationStage<R>> 
transformationStages, RetryWithToleranceOperator retryWithToleranceOperator) {
+        this.transformationStages = transformationStages;
         this.retryWithToleranceOperator = retryWithToleranceOperator;
     }
 
     public R apply(R record) {
-        if (transformations.isEmpty()) return record;
+        if (transformationStages.isEmpty()) return record;
 
-        for (final Transformation<R> transformation : transformations) {
+        for (final TransformationStage<R> transformationStage : 
transformationStages) {
             final R current = record;
 
             log.trace("Applying transformation {} to {}",
-                transformation.getClass().getName(), record);
+                transformationStage.transformClass().getName(), record);
             // execute the operation
-            record = retryWithToleranceOperator.execute(() -> 
transformation.apply(current), Stage.TRANSFORMATION, transformation.getClass());
+            record = retryWithToleranceOperator.execute(() -> 
transformationStage.apply(current), Stage.TRANSFORMATION, 
transformationStage.transformClass());
 
             if (record == null) break;
         }
@@ -61,8 +60,8 @@ public class TransformationChain<R extends ConnectRecord<R>> 
implements AutoClos
 
     @Override
     public void close() {
-        for (Transformation<R> transformation : transformations) {
-            transformation.close();
+        for (TransformationStage<R> transformationStage : 
transformationStages) {
+            transformationStage.close();
         }
     }
 
@@ -71,18 +70,18 @@ public class TransformationChain<R extends 
ConnectRecord<R>> implements AutoClos
         if (this == o) return true;
         if (o == null || getClass() != o.getClass()) return false;
         TransformationChain<?> that = (TransformationChain<?>) o;
-        return Objects.equals(transformations, that.transformations);
+        return Objects.equals(transformationStages, that.transformationStages);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(transformations);
+        return Objects.hash(transformationStages);
     }
 
     public String toString() {
         StringJoiner chain = new StringJoiner(", ", getClass().getName() + 
"{", "}");
-        for (Transformation<R> transformation : transformations) {
-            chain.add(transformation.getClass().getName());
+        for (TransformationStage<R> transformationStage : 
transformationStages) {
+            chain.add(transformationStage.transformClass().getName());
         }
         return chain.toString();
     }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/PredicatedTransformation.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationStage.java
similarity index 56%
rename from 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/PredicatedTransformation.java
rename to 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationStage.java
index 446db5b2f32..3831730ad8f 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/PredicatedTransformation.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationStage.java
@@ -16,65 +16,60 @@
  */
 package org.apache.kafka.connect.runtime;
 
-import java.util.Map;
 
-import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.connector.ConnectRecord;
-import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.transforms.Transformation;
 import org.apache.kafka.connect.transforms.predicates.Predicate;
 
 /**
- * Decorator for a {@link Transformation} which applies the delegate only when 
a
- * {@link Predicate} is true (or false, according to {@code negate}).
+ * Wrapper for a {@link Transformation} and corresponding optional {@link 
Predicate}
+ * which applies the transformation when the {@link Predicate} is true (or 
false, according to {@code negate}).
+ * If no {@link Predicate} is provided, the transformation will be 
unconditionally applied.
  * @param <R> The type of record (must be an implementation of {@link 
ConnectRecord})
  */
-public class PredicatedTransformation<R extends ConnectRecord<R>> implements 
Transformation<R> {
+public class TransformationStage<R extends ConnectRecord<R>> implements 
AutoCloseable {
 
     static final String PREDICATE_CONFIG = "predicate";
     static final String NEGATE_CONFIG = "negate";
-    final Predicate<R> predicate;
-    final Transformation<R> delegate;
-    final boolean negate;
+    private final Predicate<R> predicate;
+    private final Transformation<R> transformation;
+    private final boolean negate;
 
-    PredicatedTransformation(Predicate<R> predicate, boolean negate, 
Transformation<R> delegate) {
+    TransformationStage(Transformation<R> transformation) {
+        this(null, false, transformation);
+    }
+
+    TransformationStage(Predicate<R> predicate, boolean negate, 
Transformation<R> transformation) {
         this.predicate = predicate;
         this.negate = negate;
-        this.delegate = delegate;
+        this.transformation = transformation;
     }
 
-    @Override
-    public void configure(Map<String, ?> configs) {
-        throw new ConnectException(PredicatedTransformation.class.getName() + 
".configure() " +
-                "should never be called directly.");
+    public Class<? extends Transformation<R>> transformClass() {
+        @SuppressWarnings("unchecked")
+        Class<? extends Transformation<R>> transformClass = (Class<? extends 
Transformation<R>>) transformation.getClass();
+        return transformClass;
     }
 
-    @Override
     public R apply(R record) {
-        if (negate ^ predicate.test(record)) {
-            return delegate.apply(record);
+        if (predicate == null || negate ^ predicate.test(record)) {
+            return transformation.apply(record);
         }
         return record;
     }
 
-    @Override
-    public ConfigDef config() {
-        throw new ConnectException(PredicatedTransformation.class.getName() + 
".config() " +
-                "should never be called directly.");
-    }
-
     @Override
     public void close() {
-        Utils.closeQuietly(delegate, "predicated transformation");
+        Utils.closeQuietly(transformation, "transformation");
         Utils.closeQuietly(predicate, "predicate");
     }
 
     @Override
     public String toString() {
-        return "PredicatedTransformation{" +
+        return "TransformationStage{" +
                 "predicate=" + predicate +
-                ", delegate=" + delegate +
+                ", transformation=" + transformation +
                 ", negate=" + negate +
                 '}';
     }
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index f966ea4cb55..8f9f727e25e 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -1253,7 +1253,7 @@ public class Worker {
                            Class<? extends Connector> connectorClass,
                            RetryWithToleranceOperator 
retryWithToleranceOperator) {
 
-            TransformationChain<SinkRecord> transformationChain = new 
TransformationChain<>(connectorConfig.<SinkRecord>transformations(), 
retryWithToleranceOperator);
+            TransformationChain<SinkRecord> transformationChain = new 
TransformationChain<>(connectorConfig.<SinkRecord>transformationStages(), 
retryWithToleranceOperator);
             log.info("Initializing: {}", transformationChain);
             SinkConnectorConfig sinkConfig = new SinkConnectorConfig(plugins, 
connectorConfig.originalsStrings());
             retryWithToleranceOperator.reporters(sinkTaskReporters(id, 
sinkConfig, errorHandlingMetrics, connectorClass));
@@ -1297,7 +1297,7 @@ public class Worker {
             SourceConnectorConfig sourceConfig = new 
SourceConnectorConfig(plugins,
                     connectorConfig.originalsStrings(), 
config.topicCreationEnable());
             retryWithToleranceOperator.reporters(sourceTaskReporters(id, 
sourceConfig, errorHandlingMetrics));
-            TransformationChain<SourceRecord> transformationChain = new 
TransformationChain<>(sourceConfig.<SourceRecord>transformations(), 
retryWithToleranceOperator);
+            TransformationChain<SourceRecord> transformationChain = new 
TransformationChain<>(sourceConfig.<SourceRecord>transformationStages(), 
retryWithToleranceOperator);
             log.info("Initializing: {}", transformationChain);
 
             Map<String, Object> producerProps = 
baseProducerConfigs(id.connector(), "connector-producer-" + id, config, 
sourceConfig, connectorClass,
@@ -1365,7 +1365,7 @@ public class Worker {
             SourceConnectorConfig sourceConfig = new 
SourceConnectorConfig(plugins,
                     connectorConfig.originalsStrings(), 
config.topicCreationEnable());
             retryWithToleranceOperator.reporters(sourceTaskReporters(id, 
sourceConfig, errorHandlingMetrics));
-            TransformationChain<SourceRecord> transformationChain = new 
TransformationChain<>(sourceConfig.<SourceRecord>transformations(), 
retryWithToleranceOperator);
+            TransformationChain<SourceRecord> transformationChain = new 
TransformationChain<>(sourceConfig.<SourceRecord>transformationStages(), 
retryWithToleranceOperator);
             log.info("Initializing: {}", transformationChain);
 
             Map<String, Object> producerProps = 
exactlyOnceSourceTaskProducerConfigs(
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
index 05b8375183c..ad8ff00cb96 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
@@ -20,7 +20,6 @@ import io.swagger.v3.oas.annotations.Operation;
 import io.swagger.v3.oas.annotations.Parameter;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
 import org.apache.kafka.connect.runtime.Herder;
-import org.apache.kafka.connect.runtime.PredicatedTransformation;
 import org.apache.kafka.connect.runtime.isolation.PluginDesc;
 import org.apache.kafka.connect.runtime.isolation.PluginType;
 import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
@@ -34,7 +33,6 @@ import org.apache.kafka.connect.tools.MockSourceConnector;
 import org.apache.kafka.connect.tools.SchemaSourceConnector;
 import org.apache.kafka.connect.tools.VerifiableSinkConnector;
 import org.apache.kafka.connect.tools.VerifiableSourceConnector;
-import org.apache.kafka.connect.transforms.Transformation;
 import org.apache.kafka.connect.util.FutureCallback;
 
 import javax.ws.rs.BadRequestException;
@@ -79,11 +77,6 @@ public class ConnectorPluginsResource implements 
ConnectResource {
             SchemaSourceConnector.class
     );
 
-    @SuppressWarnings({"unchecked", "rawtypes"})
-    static final List<Class<? extends Transformation<?>>> TRANSFORM_EXCLUDES = 
Collections.singletonList(
-            (Class) PredicatedTransformation.class
-    );
-
     public ConnectorPluginsResource(Herder herder) {
         this.herder = herder;
         this.connectorPlugins = new ArrayList<>();
@@ -92,7 +85,7 @@ public class ConnectorPluginsResource implements 
ConnectResource {
         // TODO: improve once plugins are allowed to be added/removed during 
runtime.
         addConnectorPlugins(herder.plugins().sinkConnectors(), 
SINK_CONNECTOR_EXCLUDES);
         addConnectorPlugins(herder.plugins().sourceConnectors(), 
SOURCE_CONNECTOR_EXCLUDES);
-        addConnectorPlugins(herder.plugins().transformations(), 
TRANSFORM_EXCLUDES);
+        addConnectorPlugins(herder.plugins().transformations(), 
Collections.emptySet());
         addConnectorPlugins(herder.plugins().predicates(), 
Collections.emptySet());
         addConnectorPlugins(herder.plugins().converters(), 
Collections.emptySet());
         addConnectorPlugins(herder.plugins().headerConverters(), 
Collections.emptySet());
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
index 4abdbeaa4e5..d8c071e6c2f 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.connect.connector.ConnectRecord;
 import org.apache.kafka.connect.connector.Connector;
 import org.apache.kafka.connect.runtime.isolation.PluginDesc;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.sink.SinkRecord;
 import org.apache.kafka.connect.transforms.Transformation;
 import org.apache.kafka.connect.transforms.predicates.Predicate;
 import org.junit.Test;
@@ -48,6 +49,8 @@ public class ConnectorConfigTest<R extends ConnectRecord<R>> {
         }
     };
 
+    private static final SinkRecord DUMMY_RECORD = new SinkRecord(null, 0, 
null, null, null, null, 0L);
+
     public static abstract class TestConnector extends Connector {
     }
 
@@ -62,7 +65,7 @@ public class ConnectorConfigTest<R extends ConnectRecord<R>> {
 
         @Override
         public R apply(R record) {
-            return null;
+            return record.newRecord(null, magicNumber, null, null, null, null, 
0L);
         }
 
         @Override
@@ -147,10 +150,11 @@ public class ConnectorConfigTest<R extends 
ConnectRecord<R>> {
         props.put("transforms.a.type", SimpleTransformation.class.getName());
         props.put("transforms.a.magic.number", "42");
         final ConnectorConfig config = new ConnectorConfig(MOCK_PLUGINS, 
props);
-        final List<Transformation<R>> transformations = 
config.transformations();
-        assertEquals(1, transformations.size());
-        final SimpleTransformation<R> xform = (SimpleTransformation<R>) 
transformations.get(0);
-        assertEquals(42, xform.magicNumber);
+        final List<TransformationStage<SinkRecord>> transformationStages = 
config.transformationStages();
+        assertEquals(1, transformationStages.size());
+        final TransformationStage<SinkRecord> stage = 
transformationStages.get(0);
+        assertEquals(SimpleTransformation.class, stage.transformClass());
+        assertEquals(42, 
stage.apply(DUMMY_RECORD).kafkaPartition().intValue());
     }
 
     @Test
@@ -175,10 +179,10 @@ public class ConnectorConfigTest<R extends 
ConnectRecord<R>> {
         props.put("transforms.b.type", SimpleTransformation.class.getName());
         props.put("transforms.b.magic.number", "84");
         final ConnectorConfig config = new ConnectorConfig(MOCK_PLUGINS, 
props);
-        final List<Transformation<R>> transformations = 
config.transformations();
-        assertEquals(2, transformations.size());
-        assertEquals(42, ((SimpleTransformation<R>) 
transformations.get(0)).magicNumber);
-        assertEquals(84, ((SimpleTransformation<R>) 
transformations.get(1)).magicNumber);
+        final List<TransformationStage<SinkRecord>> transformationStages = 
config.transformationStages();
+        assertEquals(2, transformationStages.size());
+        assertEquals(42, 
transformationStages.get(0).apply(DUMMY_RECORD).kafkaPartition().intValue());
+        assertEquals(84, 
transformationStages.get(1).apply(DUMMY_RECORD).kafkaPartition().intValue());
     }
 
     @Test
@@ -246,7 +250,7 @@ public class ConnectorConfigTest<R extends 
ConnectRecord<R>> {
         props.put("predicates", "my-pred");
         props.put("predicates.my-pred.type", TestPredicate.class.getName());
         props.put("predicates.my-pred.int", "84");
-        assertPredicatedTransform(props, true);
+        assertTransformationStageWithPredicate(props, true);
     }
 
     @Test
@@ -261,7 +265,7 @@ public class ConnectorConfigTest<R extends 
ConnectRecord<R>> {
         props.put("predicates", "my-pred");
         props.put("predicates.my-pred.type", TestPredicate.class.getName());
         props.put("predicates.my-pred.int", "84");
-        assertPredicatedTransform(props, false);
+        assertTransformationStageWithPredicate(props, false);
     }
 
     @Test
@@ -280,25 +284,19 @@ public class ConnectorConfigTest<R extends 
ConnectRecord<R>> {
         assertTrue(e.getMessage().contains("Predicate is abstract and cannot 
be created"));
     }
 
-    private void assertPredicatedTransform(Map<String, String> props, boolean 
expectedNegated) {
+    private void assertTransformationStageWithPredicate(Map<String, String> 
props, boolean expectedNegated) {
         final ConnectorConfig config = new ConnectorConfig(MOCK_PLUGINS, 
props);
-        final List<Transformation<R>> transformations = 
config.transformations();
-        assertEquals(1, transformations.size());
-        assertTrue(transformations.get(0) instanceof PredicatedTransformation);
-        PredicatedTransformation<?> predicated = (PredicatedTransformation<?>) 
transformations.get(0);
-
-        assertEquals(expectedNegated, predicated.negate);
-
-        assertTrue(predicated.delegate instanceof 
ConnectorConfigTest.SimpleTransformation);
-        assertEquals(42, ((SimpleTransformation<?>) 
predicated.delegate).magicNumber);
+        final List<TransformationStage<SinkRecord>> transformationStages = 
config.transformationStages();
+        assertEquals(1, transformationStages.size());
+        TransformationStage<SinkRecord> stage = transformationStages.get(0);
 
-        assertTrue(predicated.predicate instanceof 
ConnectorConfigTest.TestPredicate);
-        assertEquals(84, ((TestPredicate<?>) predicated.predicate).param);
+        assertEquals(expectedNegated ? 42 : 0, 
stage.apply(DUMMY_RECORD).kafkaPartition().intValue());
 
-        predicated.close();
+        SinkRecord matchingRecord = DUMMY_RECORD.newRecord(null, 84, null, 
null, null, null, 0L);
+        assertEquals(expectedNegated ? 84 : 42, 
stage.apply(matchingRecord).kafkaPartition().intValue());
+        assertEquals(SimpleTransformation.class, stage.transformClass());
 
-        assertEquals(0, ((SimpleTransformation<?>) 
predicated.delegate).magicNumber);
-        assertEquals(0, ((TestPredicate<?>) predicated.predicate).param);
+        stage.close();
     }
 
     @Test
@@ -381,7 +379,7 @@ public class ConnectorConfigTest<R extends 
ConnectRecord<R>> {
 
         @Override
         public boolean test(R record) {
-            return false;
+            return record.kafkaPartition() == param;
         }
 
         @Override
@@ -445,8 +443,8 @@ public class ConnectorConfigTest<R extends 
ConnectRecord<R>> {
         props.put(prefix + "type", 
HasDuplicateConfigTransformation.class.getName());
         ConfigDef def = ConnectorConfig.enrich(MOCK_PLUGINS, new ConfigDef(), 
props, false);
         assertEnrichedConfigDef(def, prefix, 
HasDuplicateConfigTransformation.MUST_EXIST_KEY, ConfigDef.Type.BOOLEAN);
-        assertEnrichedConfigDef(def, prefix, 
PredicatedTransformation.PREDICATE_CONFIG, ConfigDef.Type.STRING);
-        assertEnrichedConfigDef(def, prefix, 
PredicatedTransformation.NEGATE_CONFIG, ConfigDef.Type.BOOLEAN);
+        assertEnrichedConfigDef(def, prefix, 
TransformationStage.PREDICATE_CONFIG, ConfigDef.Type.STRING);
+        assertEnrichedConfigDef(def, prefix, 
TransformationStage.NEGATE_CONFIG, ConfigDef.Type.BOOLEAN);
     }
 
     private static void assertEnrichedConfigDef(ConfigDef def, String prefix, 
String keyName, ConfigDef.Type expectedType) {
@@ -460,9 +458,9 @@ public class ConnectorConfigTest<R extends 
ConnectRecord<R>> {
         private static final String MUST_EXIST_KEY = "must.exist.key";
         private static final ConfigDef CONFIG_DEF = new ConfigDef()
                 // this configDef is duplicate. It should be removed 
automatically so as to avoid duplicate config error.
-                .define(PredicatedTransformation.PREDICATE_CONFIG, 
ConfigDef.Type.INT, ConfigDef.NO_DEFAULT_VALUE, ConfigDef.Importance.MEDIUM, 
"fake")
+                .define(TransformationStage.PREDICATE_CONFIG, 
ConfigDef.Type.INT, ConfigDef.NO_DEFAULT_VALUE, ConfigDef.Importance.MEDIUM, 
"fake")
                 // this configDef is duplicate. It should be removed 
automatically so as to avoid duplicate config error.
-                .define(PredicatedTransformation.NEGATE_CONFIG, 
ConfigDef.Type.INT, 123, ConfigDef.Importance.MEDIUM, "fake")
+                .define(TransformationStage.NEGATE_CONFIG, ConfigDef.Type.INT, 
123, ConfigDef.Importance.MEDIUM, "fake")
                 // this configDef should appear if above duplicate configDef 
is removed without any error
                 .define(MUST_EXIST_KEY, ConfigDef.Type.BOOLEAN, true, 
ConfigDef.Importance.MEDIUM, "this key must exist");
 
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
index 4de13128dbc..3a0090f2267 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
@@ -502,7 +502,7 @@ public class ErrorHandlingTaskTest {
         converter.configure(oo);
 
         TransformationChain<SinkRecord> sinkTransforms =
-                new TransformationChain<>(singletonList(new 
FaultyPassthrough<SinkRecord>()), retryWithToleranceOperator);
+                new TransformationChain<>(singletonList(new 
TransformationStage<>(new FaultyPassthrough<SinkRecord>())), 
retryWithToleranceOperator);
 
         workerSinkTask = new WorkerSinkTask(
             taskId, sinkTask, statusListener, initialState, workerConfig,
@@ -532,7 +532,7 @@ public class ErrorHandlingTaskTest {
     }
 
     private void createSourceTask(TargetState initialState, 
RetryWithToleranceOperator retryWithToleranceOperator, Converter converter) {
-        TransformationChain<SourceRecord> sourceTransforms = new 
TransformationChain<>(singletonList(new FaultyPassthrough<SourceRecord>()), 
retryWithToleranceOperator);
+        TransformationChain<SourceRecord> sourceTransforms = new 
TransformationChain<>(singletonList(new TransformationStage<>(new 
FaultyPassthrough<SourceRecord>())), retryWithToleranceOperator);
 
         workerSourceTask = spy(new WorkerSourceTask(
             taskId, sourceTask, statusListener, initialState, converter,
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/PredicatedTransformationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/TransformationStageTest.java
similarity index 63%
rename from 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/PredicatedTransformationTest.java
rename to 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/TransformationStageTest.java
index 8bce328817a..d31e8563f8c 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/PredicatedTransformationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/TransformationStageTest.java
@@ -17,13 +17,18 @@
 package org.apache.kafka.connect.runtime;
 
 import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.transforms.Transformation;
+import org.apache.kafka.connect.transforms.predicates.Predicate;
 import org.junit.Test;
 
 import static java.util.Collections.singletonMap;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
-public class PredicatedTransformationTest {
+public class TransformationStageTest {
 
     private final SourceRecord initial = new 
SourceRecord(singletonMap("initial", 1), null, null, null, null);
     private final SourceRecord transformed = new 
SourceRecord(singletonMap("transformed", 2), null, null, null, null);
@@ -39,17 +44,21 @@ public class PredicatedTransformationTest {
     private void applyAndAssert(boolean predicateResult, boolean negate,
                                 SourceRecord expectedResult) {
 
-        SamplePredicate predicate = new SamplePredicate(predicateResult);
-        SampleTransformation<SourceRecord> predicatedTransform = new 
SampleTransformation<>(transformed);
-        PredicatedTransformation<SourceRecord> pt = new 
PredicatedTransformation<>(
+        @SuppressWarnings("unchecked")
+        Predicate<SourceRecord> predicate = mock(Predicate.class);
+        when(predicate.test(any())).thenReturn(predicateResult);
+        @SuppressWarnings("unchecked")
+        Transformation<SourceRecord> transformation = 
mock(Transformation.class);
+        when(transformation.apply(any())).thenReturn(transformed);
+        TransformationStage<SourceRecord> stage = new TransformationStage<>(
                 predicate,
                 negate,
-                predicatedTransform);
+                transformation);
 
-        assertEquals(expectedResult, pt.apply(initial));
+        assertEquals(expectedResult, stage.apply(initial));
 
-        pt.close();
-        assertTrue(predicate.closed);
-        assertTrue(predicatedTransform.closed);
+        stage.close();
+        verify(predicate).close();
+        verify(transformation).close();
     }
 }
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
index 59cf83ca9ae..63e7c27c92b 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
@@ -387,8 +387,7 @@ public class ConnectorPluginsResourceTest {
     public void testListAllPlugins() {
         Set<Class<?>> excludes = Stream.of(
                         ConnectorPluginsResource.SINK_CONNECTOR_EXCLUDES,
-                        ConnectorPluginsResource.SOURCE_CONNECTOR_EXCLUDES,
-                        ConnectorPluginsResource.TRANSFORM_EXCLUDES
+                        ConnectorPluginsResource.SOURCE_CONNECTOR_EXCLUDES
                 ).flatMap(Collection::stream)
                 .collect(Collectors.toSet());
         Set<PluginInfo> expectedConnectorPlugins = Stream.of(
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicCreationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicCreationTest.java
index feb0e5f5ac9..af11782a041 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicCreationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicCreationTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.connect.util;
 
 import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.runtime.TransformationStage;
 import org.apache.kafka.connect.runtime.SourceConnectorConfig;
 import org.apache.kafka.connect.runtime.WorkerConfig;
 import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
@@ -26,7 +27,6 @@ import org.apache.kafka.connect.source.SourceRecord;
 import org.apache.kafka.connect.storage.StringConverter;
 import org.apache.kafka.connect.transforms.Cast;
 import org.apache.kafka.connect.transforms.RegexRouter;
-import org.apache.kafka.connect.transforms.Transformation;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -516,9 +516,9 @@ public class TopicCreationTest {
         topicCreation.addTopic(FOO_TOPIC);
         assertFalse(topicCreation.isTopicCreationRequired(FOO_TOPIC));
 
-        List<Transformation<SourceRecord>> transformations = 
sourceConfig.transformations();
-        assertEquals(1, transformations.size());
-        Cast<SourceRecord> xform = (Cast<SourceRecord>) transformations.get(0);
+        List<TransformationStage<SourceRecord>> transformationStages = 
sourceConfig.transformationStages();
+        assertEquals(1, transformationStages.size());
+        TransformationStage<SourceRecord> xform = transformationStages.get(0);
         SourceRecord transformed = xform.apply(new SourceRecord(null, null, 
"topic", 0, null, null, Schema.INT8_SCHEMA, 42));
         assertEquals(Schema.Type.INT8, transformed.valueSchema().type());
         assertEquals((byte) 42, transformed.value());
@@ -623,15 +623,15 @@ public class TopicCreationTest {
         assertEquals(barPartitions, barTopicSpec.numPartitions());
         assertThat(barTopicSpec.configs(), is(barTopicProps));
 
-        List<Transformation<SourceRecord>> transformations = 
sourceConfig.transformations();
-        assertEquals(2, transformations.size());
+        List<TransformationStage<SourceRecord>> transformationStages = 
sourceConfig.transformationStages();
+        assertEquals(2, transformationStages.size());
 
-        Cast<SourceRecord> castXForm = (Cast<SourceRecord>) 
transformations.get(0);
+        TransformationStage<SourceRecord> castXForm = 
transformationStages.get(0);
         SourceRecord transformed = castXForm.apply(new SourceRecord(null, 
null, "topic", 0, null, null, Schema.INT8_SCHEMA, 42));
         assertEquals(Schema.Type.INT8, transformed.valueSchema().type());
         assertEquals((byte) 42, transformed.value());
 
-        RegexRouter<SourceRecord> regexRouterXForm = 
(RegexRouter<SourceRecord>) transformations.get(1);
+        TransformationStage<SourceRecord> regexRouterXForm = 
transformationStages.get(1);
         transformed = regexRouterXForm.apply(new SourceRecord(null, null, 
"topic", 0, null, null, Schema.INT8_SCHEMA, 42));
         assertEquals("prefix-topic", transformed.topic());
     }


Reply via email to