This is an automated email from the ASF dual-hosted git repository. cegerton pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new f586fa59d3f KAFKA-14671: Refactor PredicatedTransformation to not implement Transformation (#13184) f586fa59d3f is described below commit f586fa59d3f938e04bda4e8143ddb1c4310eaf78 Author: Greg Harris <greg.har...@aiven.io> AuthorDate: Tue Feb 28 08:23:19 2023 -0800 KAFKA-14671: Refactor PredicatedTransformation to not implement Transformation (#13184) Reviewers: Christo Lolov <christolo...@gmail.com>, Yash Mayya <yash.ma...@gmail.com>, Chris Egerton <chr...@aiven.io> --- .../kafka/connect/runtime/ConnectorConfig.java | 28 +++++----- .../kafka/connect/runtime/TransformationChain.java | 27 +++++----- ...ransformation.java => TransformationStage.java} | 49 ++++++++---------- .../org/apache/kafka/connect/runtime/Worker.java | 6 +-- .../rest/resources/ConnectorPluginsResource.java | 9 +--- .../kafka/connect/runtime/ConnectorConfigTest.java | 60 +++++++++++----------- .../connect/runtime/ErrorHandlingTaskTest.java | 4 +- ...ationTest.java => TransformationStageTest.java} | 29 +++++++---- .../resources/ConnectorPluginsResourceTest.java | 3 +- .../kafka/connect/util/TopicCreationTest.java | 16 +++--- 10 files changed, 113 insertions(+), 118 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java index 485dda9b98b..40b7c0a1462 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java @@ -268,12 +268,14 @@ public class ConnectorConfig extends AbstractConfig { } /** - * Returns the initialized list of {@link Transformation} which are specified in {@link #TRANSFORMS_CONFIG}. + * Returns the initialized list of {@link TransformationStage} which apply the + * {@link Transformation transformations} and {@link Predicate predicates} + * as they are specified in the {@link #TRANSFORMS_CONFIG} and {@link #PREDICATES_CONFIG} */ - public <R extends ConnectRecord<R>> List<Transformation<R>> transformations() { + public <R extends ConnectRecord<R>> List<TransformationStage<R>> transformationStages() { final List<String> transformAliases = getList(TRANSFORMS_CONFIG); - final List<Transformation<R>> transformations = new ArrayList<>(transformAliases.size()); + final List<TransformationStage<R>> transformations = new ArrayList<>(transformAliases.size()); for (String alias : transformAliases) { final String prefix = TRANSFORMS_CONFIG + "." + alias + "."; @@ -281,17 +283,17 @@ public class ConnectorConfig extends AbstractConfig { @SuppressWarnings("unchecked") final Transformation<R> transformation = Utils.newInstance(getClass(prefix + "type"), Transformation.class); Map<String, Object> configs = originalsWithPrefix(prefix); - Object predicateAlias = configs.remove(PredicatedTransformation.PREDICATE_CONFIG); - Object negate = configs.remove(PredicatedTransformation.NEGATE_CONFIG); + Object predicateAlias = configs.remove(TransformationStage.PREDICATE_CONFIG); + Object negate = configs.remove(TransformationStage.NEGATE_CONFIG); transformation.configure(configs); if (predicateAlias != null) { String predicatePrefix = PREDICATES_PREFIX + predicateAlias + "."; @SuppressWarnings("unchecked") Predicate<R> predicate = Utils.newInstance(getClass(predicatePrefix + "type"), Predicate.class); predicate.configure(originalsWithPrefix(predicatePrefix)); - transformations.add(new PredicatedTransformation<>(predicate, negate == null ? false : Boolean.parseBoolean(negate.toString()), transformation)); + transformations.add(new TransformationStage<>(predicate, negate == null ? false : Boolean.parseBoolean(negate.toString()), transformation)); } else { - transformations.add(transformation); + transformations.add(new TransformationStage<>(transformation)); } } catch (Exception e) { throw new ConnectException(e); @@ -321,9 +323,9 @@ public class ConnectorConfig extends AbstractConfig { protected ConfigDef initialConfigDef() { // All Transformations get these config parameters implicitly return super.initialConfigDef() - .define(PredicatedTransformation.PREDICATE_CONFIG, Type.STRING, "", Importance.MEDIUM, + .define(TransformationStage.PREDICATE_CONFIG, Type.STRING, null, Importance.MEDIUM, "The alias of a predicate used to determine whether to apply this transformation.") - .define(PredicatedTransformation.NEGATE_CONFIG, Type.BOOLEAN, false, Importance.MEDIUM, + .define(TransformationStage.NEGATE_CONFIG, Type.BOOLEAN, false, Importance.MEDIUM, "Whether the configured predicate should be negated."); } @@ -332,8 +334,8 @@ public class ConnectorConfig extends AbstractConfig { return super.configDefsForClass(typeConfig) .filter(entry -> { // The implicit parameters mask any from the transformer with the same name - if (PredicatedTransformation.PREDICATE_CONFIG.equals(entry.getKey()) - || PredicatedTransformation.NEGATE_CONFIG.equals(entry.getKey())) { + if (TransformationStage.PREDICATE_CONFIG.equals(entry.getKey()) + || TransformationStage.NEGATE_CONFIG.equals(entry.getKey())) { log.warn("Transformer config {} is masked by implicit config of that name", entry.getKey()); return false; @@ -350,8 +352,8 @@ public class ConnectorConfig extends AbstractConfig { @Override protected void validateProps(String prefix) { - String prefixedNegate = prefix + PredicatedTransformation.NEGATE_CONFIG; - String prefixedPredicate = prefix + PredicatedTransformation.PREDICATE_CONFIG; + String prefixedNegate = prefix + TransformationStage.NEGATE_CONFIG; + String prefixedPredicate = prefix + TransformationStage.PREDICATE_CONFIG; if (props.containsKey(prefixedNegate) && !props.containsKey(prefixedPredicate)) { throw new ConfigException("Config '" + prefixedNegate + "' was provided " + diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationChain.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationChain.java index 984c1422572..b130a226f5f 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationChain.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationChain.java @@ -19,7 +19,6 @@ package org.apache.kafka.connect.runtime; import org.apache.kafka.connect.connector.ConnectRecord; import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator; import org.apache.kafka.connect.runtime.errors.Stage; -import org.apache.kafka.connect.transforms.Transformation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,24 +33,24 @@ import java.util.StringJoiner; public class TransformationChain<R extends ConnectRecord<R>> implements AutoCloseable { private static final Logger log = LoggerFactory.getLogger(TransformationChain.class); - private final List<Transformation<R>> transformations; + private final List<TransformationStage<R>> transformationStages; private final RetryWithToleranceOperator retryWithToleranceOperator; - public TransformationChain(List<Transformation<R>> transformations, RetryWithToleranceOperator retryWithToleranceOperator) { - this.transformations = transformations; + public TransformationChain(List<TransformationStage<R>> transformationStages, RetryWithToleranceOperator retryWithToleranceOperator) { + this.transformationStages = transformationStages; this.retryWithToleranceOperator = retryWithToleranceOperator; } public R apply(R record) { - if (transformations.isEmpty()) return record; + if (transformationStages.isEmpty()) return record; - for (final Transformation<R> transformation : transformations) { + for (final TransformationStage<R> transformationStage : transformationStages) { final R current = record; log.trace("Applying transformation {} to {}", - transformation.getClass().getName(), record); + transformationStage.transformClass().getName(), record); // execute the operation - record = retryWithToleranceOperator.execute(() -> transformation.apply(current), Stage.TRANSFORMATION, transformation.getClass()); + record = retryWithToleranceOperator.execute(() -> transformationStage.apply(current), Stage.TRANSFORMATION, transformationStage.transformClass()); if (record == null) break; } @@ -61,8 +60,8 @@ public class TransformationChain<R extends ConnectRecord<R>> implements AutoClos @Override public void close() { - for (Transformation<R> transformation : transformations) { - transformation.close(); + for (TransformationStage<R> transformationStage : transformationStages) { + transformationStage.close(); } } @@ -71,18 +70,18 @@ public class TransformationChain<R extends ConnectRecord<R>> implements AutoClos if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; TransformationChain<?> that = (TransformationChain<?>) o; - return Objects.equals(transformations, that.transformations); + return Objects.equals(transformationStages, that.transformationStages); } @Override public int hashCode() { - return Objects.hash(transformations); + return Objects.hash(transformationStages); } public String toString() { StringJoiner chain = new StringJoiner(", ", getClass().getName() + "{", "}"); - for (Transformation<R> transformation : transformations) { - chain.add(transformation.getClass().getName()); + for (TransformationStage<R> transformationStage : transformationStages) { + chain.add(transformationStage.transformClass().getName()); } return chain.toString(); } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/PredicatedTransformation.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationStage.java similarity index 56% rename from connect/runtime/src/main/java/org/apache/kafka/connect/runtime/PredicatedTransformation.java rename to connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationStage.java index 446db5b2f32..3831730ad8f 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/PredicatedTransformation.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationStage.java @@ -16,65 +16,60 @@ */ package org.apache.kafka.connect.runtime; -import java.util.Map; -import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.connect.connector.ConnectRecord; -import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.transforms.Transformation; import org.apache.kafka.connect.transforms.predicates.Predicate; /** - * Decorator for a {@link Transformation} which applies the delegate only when a - * {@link Predicate} is true (or false, according to {@code negate}). + * Wrapper for a {@link Transformation} and corresponding optional {@link Predicate} + * which applies the transformation when the {@link Predicate} is true (or false, according to {@code negate}). + * If no {@link Predicate} is provided, the transformation will be unconditionally applied. * @param <R> The type of record (must be an implementation of {@link ConnectRecord}) */ -public class PredicatedTransformation<R extends ConnectRecord<R>> implements Transformation<R> { +public class TransformationStage<R extends ConnectRecord<R>> implements AutoCloseable { static final String PREDICATE_CONFIG = "predicate"; static final String NEGATE_CONFIG = "negate"; - final Predicate<R> predicate; - final Transformation<R> delegate; - final boolean negate; + private final Predicate<R> predicate; + private final Transformation<R> transformation; + private final boolean negate; - PredicatedTransformation(Predicate<R> predicate, boolean negate, Transformation<R> delegate) { + TransformationStage(Transformation<R> transformation) { + this(null, false, transformation); + } + + TransformationStage(Predicate<R> predicate, boolean negate, Transformation<R> transformation) { this.predicate = predicate; this.negate = negate; - this.delegate = delegate; + this.transformation = transformation; } - @Override - public void configure(Map<String, ?> configs) { - throw new ConnectException(PredicatedTransformation.class.getName() + ".configure() " + - "should never be called directly."); + public Class<? extends Transformation<R>> transformClass() { + @SuppressWarnings("unchecked") + Class<? extends Transformation<R>> transformClass = (Class<? extends Transformation<R>>) transformation.getClass(); + return transformClass; } - @Override public R apply(R record) { - if (negate ^ predicate.test(record)) { - return delegate.apply(record); + if (predicate == null || negate ^ predicate.test(record)) { + return transformation.apply(record); } return record; } - @Override - public ConfigDef config() { - throw new ConnectException(PredicatedTransformation.class.getName() + ".config() " + - "should never be called directly."); - } - @Override public void close() { - Utils.closeQuietly(delegate, "predicated transformation"); + Utils.closeQuietly(transformation, "transformation"); Utils.closeQuietly(predicate, "predicate"); } @Override public String toString() { - return "PredicatedTransformation{" + + return "TransformationStage{" + "predicate=" + predicate + - ", delegate=" + delegate + + ", transformation=" + transformation + ", negate=" + negate + '}'; } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java index f966ea4cb55..8f9f727e25e 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java @@ -1253,7 +1253,7 @@ public class Worker { Class<? extends Connector> connectorClass, RetryWithToleranceOperator retryWithToleranceOperator) { - TransformationChain<SinkRecord> transformationChain = new TransformationChain<>(connectorConfig.<SinkRecord>transformations(), retryWithToleranceOperator); + TransformationChain<SinkRecord> transformationChain = new TransformationChain<>(connectorConfig.<SinkRecord>transformationStages(), retryWithToleranceOperator); log.info("Initializing: {}", transformationChain); SinkConnectorConfig sinkConfig = new SinkConnectorConfig(plugins, connectorConfig.originalsStrings()); retryWithToleranceOperator.reporters(sinkTaskReporters(id, sinkConfig, errorHandlingMetrics, connectorClass)); @@ -1297,7 +1297,7 @@ public class Worker { SourceConnectorConfig sourceConfig = new SourceConnectorConfig(plugins, connectorConfig.originalsStrings(), config.topicCreationEnable()); retryWithToleranceOperator.reporters(sourceTaskReporters(id, sourceConfig, errorHandlingMetrics)); - TransformationChain<SourceRecord> transformationChain = new TransformationChain<>(sourceConfig.<SourceRecord>transformations(), retryWithToleranceOperator); + TransformationChain<SourceRecord> transformationChain = new TransformationChain<>(sourceConfig.<SourceRecord>transformationStages(), retryWithToleranceOperator); log.info("Initializing: {}", transformationChain); Map<String, Object> producerProps = baseProducerConfigs(id.connector(), "connector-producer-" + id, config, sourceConfig, connectorClass, @@ -1365,7 +1365,7 @@ public class Worker { SourceConnectorConfig sourceConfig = new SourceConnectorConfig(plugins, connectorConfig.originalsStrings(), config.topicCreationEnable()); retryWithToleranceOperator.reporters(sourceTaskReporters(id, sourceConfig, errorHandlingMetrics)); - TransformationChain<SourceRecord> transformationChain = new TransformationChain<>(sourceConfig.<SourceRecord>transformations(), retryWithToleranceOperator); + TransformationChain<SourceRecord> transformationChain = new TransformationChain<>(sourceConfig.<SourceRecord>transformationStages(), retryWithToleranceOperator); log.info("Initializing: {}", transformationChain); Map<String, Object> producerProps = exactlyOnceSourceTaskProducerConfigs( diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java index 05b8375183c..ad8ff00cb96 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java @@ -20,7 +20,6 @@ import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.Parameter; import org.apache.kafka.connect.runtime.ConnectorConfig; import org.apache.kafka.connect.runtime.Herder; -import org.apache.kafka.connect.runtime.PredicatedTransformation; import org.apache.kafka.connect.runtime.isolation.PluginDesc; import org.apache.kafka.connect.runtime.isolation.PluginType; import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos; @@ -34,7 +33,6 @@ import org.apache.kafka.connect.tools.MockSourceConnector; import org.apache.kafka.connect.tools.SchemaSourceConnector; import org.apache.kafka.connect.tools.VerifiableSinkConnector; import org.apache.kafka.connect.tools.VerifiableSourceConnector; -import org.apache.kafka.connect.transforms.Transformation; import org.apache.kafka.connect.util.FutureCallback; import javax.ws.rs.BadRequestException; @@ -79,11 +77,6 @@ public class ConnectorPluginsResource implements ConnectResource { SchemaSourceConnector.class ); - @SuppressWarnings({"unchecked", "rawtypes"}) - static final List<Class<? extends Transformation<?>>> TRANSFORM_EXCLUDES = Collections.singletonList( - (Class) PredicatedTransformation.class - ); - public ConnectorPluginsResource(Herder herder) { this.herder = herder; this.connectorPlugins = new ArrayList<>(); @@ -92,7 +85,7 @@ public class ConnectorPluginsResource implements ConnectResource { // TODO: improve once plugins are allowed to be added/removed during runtime. addConnectorPlugins(herder.plugins().sinkConnectors(), SINK_CONNECTOR_EXCLUDES); addConnectorPlugins(herder.plugins().sourceConnectors(), SOURCE_CONNECTOR_EXCLUDES); - addConnectorPlugins(herder.plugins().transformations(), TRANSFORM_EXCLUDES); + addConnectorPlugins(herder.plugins().transformations(), Collections.emptySet()); addConnectorPlugins(herder.plugins().predicates(), Collections.emptySet()); addConnectorPlugins(herder.plugins().converters(), Collections.emptySet()); addConnectorPlugins(herder.plugins().headerConverters(), Collections.emptySet()); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java index 4abdbeaa4e5..d8c071e6c2f 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java @@ -22,6 +22,7 @@ import org.apache.kafka.connect.connector.ConnectRecord; import org.apache.kafka.connect.connector.Connector; import org.apache.kafka.connect.runtime.isolation.PluginDesc; import org.apache.kafka.connect.runtime.isolation.Plugins; +import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.transforms.Transformation; import org.apache.kafka.connect.transforms.predicates.Predicate; import org.junit.Test; @@ -48,6 +49,8 @@ public class ConnectorConfigTest<R extends ConnectRecord<R>> { } }; + private static final SinkRecord DUMMY_RECORD = new SinkRecord(null, 0, null, null, null, null, 0L); + public static abstract class TestConnector extends Connector { } @@ -62,7 +65,7 @@ public class ConnectorConfigTest<R extends ConnectRecord<R>> { @Override public R apply(R record) { - return null; + return record.newRecord(null, magicNumber, null, null, null, null, 0L); } @Override @@ -147,10 +150,11 @@ public class ConnectorConfigTest<R extends ConnectRecord<R>> { props.put("transforms.a.type", SimpleTransformation.class.getName()); props.put("transforms.a.magic.number", "42"); final ConnectorConfig config = new ConnectorConfig(MOCK_PLUGINS, props); - final List<Transformation<R>> transformations = config.transformations(); - assertEquals(1, transformations.size()); - final SimpleTransformation<R> xform = (SimpleTransformation<R>) transformations.get(0); - assertEquals(42, xform.magicNumber); + final List<TransformationStage<SinkRecord>> transformationStages = config.transformationStages(); + assertEquals(1, transformationStages.size()); + final TransformationStage<SinkRecord> stage = transformationStages.get(0); + assertEquals(SimpleTransformation.class, stage.transformClass()); + assertEquals(42, stage.apply(DUMMY_RECORD).kafkaPartition().intValue()); } @Test @@ -175,10 +179,10 @@ public class ConnectorConfigTest<R extends ConnectRecord<R>> { props.put("transforms.b.type", SimpleTransformation.class.getName()); props.put("transforms.b.magic.number", "84"); final ConnectorConfig config = new ConnectorConfig(MOCK_PLUGINS, props); - final List<Transformation<R>> transformations = config.transformations(); - assertEquals(2, transformations.size()); - assertEquals(42, ((SimpleTransformation<R>) transformations.get(0)).magicNumber); - assertEquals(84, ((SimpleTransformation<R>) transformations.get(1)).magicNumber); + final List<TransformationStage<SinkRecord>> transformationStages = config.transformationStages(); + assertEquals(2, transformationStages.size()); + assertEquals(42, transformationStages.get(0).apply(DUMMY_RECORD).kafkaPartition().intValue()); + assertEquals(84, transformationStages.get(1).apply(DUMMY_RECORD).kafkaPartition().intValue()); } @Test @@ -246,7 +250,7 @@ public class ConnectorConfigTest<R extends ConnectRecord<R>> { props.put("predicates", "my-pred"); props.put("predicates.my-pred.type", TestPredicate.class.getName()); props.put("predicates.my-pred.int", "84"); - assertPredicatedTransform(props, true); + assertTransformationStageWithPredicate(props, true); } @Test @@ -261,7 +265,7 @@ public class ConnectorConfigTest<R extends ConnectRecord<R>> { props.put("predicates", "my-pred"); props.put("predicates.my-pred.type", TestPredicate.class.getName()); props.put("predicates.my-pred.int", "84"); - assertPredicatedTransform(props, false); + assertTransformationStageWithPredicate(props, false); } @Test @@ -280,25 +284,19 @@ public class ConnectorConfigTest<R extends ConnectRecord<R>> { assertTrue(e.getMessage().contains("Predicate is abstract and cannot be created")); } - private void assertPredicatedTransform(Map<String, String> props, boolean expectedNegated) { + private void assertTransformationStageWithPredicate(Map<String, String> props, boolean expectedNegated) { final ConnectorConfig config = new ConnectorConfig(MOCK_PLUGINS, props); - final List<Transformation<R>> transformations = config.transformations(); - assertEquals(1, transformations.size()); - assertTrue(transformations.get(0) instanceof PredicatedTransformation); - PredicatedTransformation<?> predicated = (PredicatedTransformation<?>) transformations.get(0); - - assertEquals(expectedNegated, predicated.negate); - - assertTrue(predicated.delegate instanceof ConnectorConfigTest.SimpleTransformation); - assertEquals(42, ((SimpleTransformation<?>) predicated.delegate).magicNumber); + final List<TransformationStage<SinkRecord>> transformationStages = config.transformationStages(); + assertEquals(1, transformationStages.size()); + TransformationStage<SinkRecord> stage = transformationStages.get(0); - assertTrue(predicated.predicate instanceof ConnectorConfigTest.TestPredicate); - assertEquals(84, ((TestPredicate<?>) predicated.predicate).param); + assertEquals(expectedNegated ? 42 : 0, stage.apply(DUMMY_RECORD).kafkaPartition().intValue()); - predicated.close(); + SinkRecord matchingRecord = DUMMY_RECORD.newRecord(null, 84, null, null, null, null, 0L); + assertEquals(expectedNegated ? 84 : 42, stage.apply(matchingRecord).kafkaPartition().intValue()); + assertEquals(SimpleTransformation.class, stage.transformClass()); - assertEquals(0, ((SimpleTransformation<?>) predicated.delegate).magicNumber); - assertEquals(0, ((TestPredicate<?>) predicated.predicate).param); + stage.close(); } @Test @@ -381,7 +379,7 @@ public class ConnectorConfigTest<R extends ConnectRecord<R>> { @Override public boolean test(R record) { - return false; + return record.kafkaPartition() == param; } @Override @@ -445,8 +443,8 @@ public class ConnectorConfigTest<R extends ConnectRecord<R>> { props.put(prefix + "type", HasDuplicateConfigTransformation.class.getName()); ConfigDef def = ConnectorConfig.enrich(MOCK_PLUGINS, new ConfigDef(), props, false); assertEnrichedConfigDef(def, prefix, HasDuplicateConfigTransformation.MUST_EXIST_KEY, ConfigDef.Type.BOOLEAN); - assertEnrichedConfigDef(def, prefix, PredicatedTransformation.PREDICATE_CONFIG, ConfigDef.Type.STRING); - assertEnrichedConfigDef(def, prefix, PredicatedTransformation.NEGATE_CONFIG, ConfigDef.Type.BOOLEAN); + assertEnrichedConfigDef(def, prefix, TransformationStage.PREDICATE_CONFIG, ConfigDef.Type.STRING); + assertEnrichedConfigDef(def, prefix, TransformationStage.NEGATE_CONFIG, ConfigDef.Type.BOOLEAN); } private static void assertEnrichedConfigDef(ConfigDef def, String prefix, String keyName, ConfigDef.Type expectedType) { @@ -460,9 +458,9 @@ public class ConnectorConfigTest<R extends ConnectRecord<R>> { private static final String MUST_EXIST_KEY = "must.exist.key"; private static final ConfigDef CONFIG_DEF = new ConfigDef() // this configDef is duplicate. It should be removed automatically so as to avoid duplicate config error. - .define(PredicatedTransformation.PREDICATE_CONFIG, ConfigDef.Type.INT, ConfigDef.NO_DEFAULT_VALUE, ConfigDef.Importance.MEDIUM, "fake") + .define(TransformationStage.PREDICATE_CONFIG, ConfigDef.Type.INT, ConfigDef.NO_DEFAULT_VALUE, ConfigDef.Importance.MEDIUM, "fake") // this configDef is duplicate. It should be removed automatically so as to avoid duplicate config error. - .define(PredicatedTransformation.NEGATE_CONFIG, ConfigDef.Type.INT, 123, ConfigDef.Importance.MEDIUM, "fake") + .define(TransformationStage.NEGATE_CONFIG, ConfigDef.Type.INT, 123, ConfigDef.Importance.MEDIUM, "fake") // this configDef should appear if above duplicate configDef is removed without any error .define(MUST_EXIST_KEY, ConfigDef.Type.BOOLEAN, true, ConfigDef.Importance.MEDIUM, "this key must exist"); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java index 4de13128dbc..3a0090f2267 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java @@ -502,7 +502,7 @@ public class ErrorHandlingTaskTest { converter.configure(oo); TransformationChain<SinkRecord> sinkTransforms = - new TransformationChain<>(singletonList(new FaultyPassthrough<SinkRecord>()), retryWithToleranceOperator); + new TransformationChain<>(singletonList(new TransformationStage<>(new FaultyPassthrough<SinkRecord>())), retryWithToleranceOperator); workerSinkTask = new WorkerSinkTask( taskId, sinkTask, statusListener, initialState, workerConfig, @@ -532,7 +532,7 @@ public class ErrorHandlingTaskTest { } private void createSourceTask(TargetState initialState, RetryWithToleranceOperator retryWithToleranceOperator, Converter converter) { - TransformationChain<SourceRecord> sourceTransforms = new TransformationChain<>(singletonList(new FaultyPassthrough<SourceRecord>()), retryWithToleranceOperator); + TransformationChain<SourceRecord> sourceTransforms = new TransformationChain<>(singletonList(new TransformationStage<>(new FaultyPassthrough<SourceRecord>())), retryWithToleranceOperator); workerSourceTask = spy(new WorkerSourceTask( taskId, sourceTask, statusListener, initialState, converter, diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/PredicatedTransformationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/TransformationStageTest.java similarity index 63% rename from connect/runtime/src/test/java/org/apache/kafka/connect/runtime/PredicatedTransformationTest.java rename to connect/runtime/src/test/java/org/apache/kafka/connect/runtime/TransformationStageTest.java index 8bce328817a..d31e8563f8c 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/PredicatedTransformationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/TransformationStageTest.java @@ -17,13 +17,18 @@ package org.apache.kafka.connect.runtime; import org.apache.kafka.connect.source.SourceRecord; +import org.apache.kafka.connect.transforms.Transformation; +import org.apache.kafka.connect.transforms.predicates.Predicate; import org.junit.Test; import static java.util.Collections.singletonMap; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; -public class PredicatedTransformationTest { +public class TransformationStageTest { private final SourceRecord initial = new SourceRecord(singletonMap("initial", 1), null, null, null, null); private final SourceRecord transformed = new SourceRecord(singletonMap("transformed", 2), null, null, null, null); @@ -39,17 +44,21 @@ public class PredicatedTransformationTest { private void applyAndAssert(boolean predicateResult, boolean negate, SourceRecord expectedResult) { - SamplePredicate predicate = new SamplePredicate(predicateResult); - SampleTransformation<SourceRecord> predicatedTransform = new SampleTransformation<>(transformed); - PredicatedTransformation<SourceRecord> pt = new PredicatedTransformation<>( + @SuppressWarnings("unchecked") + Predicate<SourceRecord> predicate = mock(Predicate.class); + when(predicate.test(any())).thenReturn(predicateResult); + @SuppressWarnings("unchecked") + Transformation<SourceRecord> transformation = mock(Transformation.class); + when(transformation.apply(any())).thenReturn(transformed); + TransformationStage<SourceRecord> stage = new TransformationStage<>( predicate, negate, - predicatedTransform); + transformation); - assertEquals(expectedResult, pt.apply(initial)); + assertEquals(expectedResult, stage.apply(initial)); - pt.close(); - assertTrue(predicate.closed); - assertTrue(predicatedTransform.closed); + stage.close(); + verify(predicate).close(); + verify(transformation).close(); } } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java index 59cf83ca9ae..63e7c27c92b 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java @@ -387,8 +387,7 @@ public class ConnectorPluginsResourceTest { public void testListAllPlugins() { Set<Class<?>> excludes = Stream.of( ConnectorPluginsResource.SINK_CONNECTOR_EXCLUDES, - ConnectorPluginsResource.SOURCE_CONNECTOR_EXCLUDES, - ConnectorPluginsResource.TRANSFORM_EXCLUDES + ConnectorPluginsResource.SOURCE_CONNECTOR_EXCLUDES ).flatMap(Collection::stream) .collect(Collectors.toSet()); Set<PluginInfo> expectedConnectorPlugins = Stream.of( diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicCreationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicCreationTest.java index feb0e5f5ac9..af11782a041 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicCreationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicCreationTest.java @@ -19,6 +19,7 @@ package org.apache.kafka.connect.util; import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.runtime.TransformationStage; import org.apache.kafka.connect.runtime.SourceConnectorConfig; import org.apache.kafka.connect.runtime.WorkerConfig; import org.apache.kafka.connect.runtime.distributed.DistributedConfig; @@ -26,7 +27,6 @@ import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.storage.StringConverter; import org.apache.kafka.connect.transforms.Cast; import org.apache.kafka.connect.transforms.RegexRouter; -import org.apache.kafka.connect.transforms.Transformation; import org.junit.Before; import org.junit.Test; @@ -516,9 +516,9 @@ public class TopicCreationTest { topicCreation.addTopic(FOO_TOPIC); assertFalse(topicCreation.isTopicCreationRequired(FOO_TOPIC)); - List<Transformation<SourceRecord>> transformations = sourceConfig.transformations(); - assertEquals(1, transformations.size()); - Cast<SourceRecord> xform = (Cast<SourceRecord>) transformations.get(0); + List<TransformationStage<SourceRecord>> transformationStages = sourceConfig.transformationStages(); + assertEquals(1, transformationStages.size()); + TransformationStage<SourceRecord> xform = transformationStages.get(0); SourceRecord transformed = xform.apply(new SourceRecord(null, null, "topic", 0, null, null, Schema.INT8_SCHEMA, 42)); assertEquals(Schema.Type.INT8, transformed.valueSchema().type()); assertEquals((byte) 42, transformed.value()); @@ -623,15 +623,15 @@ public class TopicCreationTest { assertEquals(barPartitions, barTopicSpec.numPartitions()); assertThat(barTopicSpec.configs(), is(barTopicProps)); - List<Transformation<SourceRecord>> transformations = sourceConfig.transformations(); - assertEquals(2, transformations.size()); + List<TransformationStage<SourceRecord>> transformationStages = sourceConfig.transformationStages(); + assertEquals(2, transformationStages.size()); - Cast<SourceRecord> castXForm = (Cast<SourceRecord>) transformations.get(0); + TransformationStage<SourceRecord> castXForm = transformationStages.get(0); SourceRecord transformed = castXForm.apply(new SourceRecord(null, null, "topic", 0, null, null, Schema.INT8_SCHEMA, 42)); assertEquals(Schema.Type.INT8, transformed.valueSchema().type()); assertEquals((byte) 42, transformed.value()); - RegexRouter<SourceRecord> regexRouterXForm = (RegexRouter<SourceRecord>) transformations.get(1); + TransformationStage<SourceRecord> regexRouterXForm = transformationStages.get(1); transformed = regexRouterXForm.apply(new SourceRecord(null, null, "topic", 0, null, null, Schema.INT8_SCHEMA, 42)); assertEquals("prefix-topic", transformed.topic()); }