This is an automated email from the ASF dual-hosted git repository.
cegerton pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new f586fa59d3f KAFKA-14671: Refactor PredicatedTransformation to not
implement Transformation (#13184)
f586fa59d3f is described below
commit f586fa59d3f938e04bda4e8143ddb1c4310eaf78
Author: Greg Harris <[email protected]>
AuthorDate: Tue Feb 28 08:23:19 2023 -0800
KAFKA-14671: Refactor PredicatedTransformation to not implement
Transformation (#13184)
Reviewers: Christo Lolov <[email protected]>, Yash Mayya
<[email protected]>, Chris Egerton <[email protected]>
---
.../kafka/connect/runtime/ConnectorConfig.java | 28 +++++-----
.../kafka/connect/runtime/TransformationChain.java | 27 +++++-----
...ransformation.java => TransformationStage.java} | 49 ++++++++----------
.../org/apache/kafka/connect/runtime/Worker.java | 6 +--
.../rest/resources/ConnectorPluginsResource.java | 9 +---
.../kafka/connect/runtime/ConnectorConfigTest.java | 60 +++++++++++-----------
.../connect/runtime/ErrorHandlingTaskTest.java | 4 +-
...ationTest.java => TransformationStageTest.java} | 29 +++++++----
.../resources/ConnectorPluginsResourceTest.java | 3 +-
.../kafka/connect/util/TopicCreationTest.java | 16 +++---
10 files changed, 113 insertions(+), 118 deletions(-)
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
index 485dda9b98b..40b7c0a1462 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
@@ -268,12 +268,14 @@ public class ConnectorConfig extends AbstractConfig {
}
/**
- * Returns the initialized list of {@link Transformation} which are
specified in {@link #TRANSFORMS_CONFIG}.
+ * Returns the initialized list of {@link TransformationStage} which apply
the
+ * {@link Transformation transformations} and {@link Predicate predicates}
+ * as they are specified in the {@link #TRANSFORMS_CONFIG} and {@link
#PREDICATES_CONFIG}
*/
- public <R extends ConnectRecord<R>> List<Transformation<R>>
transformations() {
+ public <R extends ConnectRecord<R>> List<TransformationStage<R>>
transformationStages() {
final List<String> transformAliases = getList(TRANSFORMS_CONFIG);
- final List<Transformation<R>> transformations = new
ArrayList<>(transformAliases.size());
+ final List<TransformationStage<R>> transformations = new
ArrayList<>(transformAliases.size());
for (String alias : transformAliases) {
final String prefix = TRANSFORMS_CONFIG + "." + alias + ".";
@@ -281,17 +283,17 @@ public class ConnectorConfig extends AbstractConfig {
@SuppressWarnings("unchecked")
final Transformation<R> transformation =
Utils.newInstance(getClass(prefix + "type"), Transformation.class);
Map<String, Object> configs = originalsWithPrefix(prefix);
- Object predicateAlias =
configs.remove(PredicatedTransformation.PREDICATE_CONFIG);
- Object negate =
configs.remove(PredicatedTransformation.NEGATE_CONFIG);
+ Object predicateAlias =
configs.remove(TransformationStage.PREDICATE_CONFIG);
+ Object negate =
configs.remove(TransformationStage.NEGATE_CONFIG);
transformation.configure(configs);
if (predicateAlias != null) {
String predicatePrefix = PREDICATES_PREFIX +
predicateAlias + ".";
@SuppressWarnings("unchecked")
Predicate<R> predicate =
Utils.newInstance(getClass(predicatePrefix + "type"), Predicate.class);
predicate.configure(originalsWithPrefix(predicatePrefix));
- transformations.add(new
PredicatedTransformation<>(predicate, negate == null ? false :
Boolean.parseBoolean(negate.toString()), transformation));
+ transformations.add(new TransformationStage<>(predicate,
negate == null ? false : Boolean.parseBoolean(negate.toString()),
transformation));
} else {
- transformations.add(transformation);
+ transformations.add(new
TransformationStage<>(transformation));
}
} catch (Exception e) {
throw new ConnectException(e);
@@ -321,9 +323,9 @@ public class ConnectorConfig extends AbstractConfig {
protected ConfigDef initialConfigDef() {
// All Transformations get these config parameters implicitly
return super.initialConfigDef()
- .define(PredicatedTransformation.PREDICATE_CONFIG,
Type.STRING, "", Importance.MEDIUM,
+ .define(TransformationStage.PREDICATE_CONFIG,
Type.STRING, null, Importance.MEDIUM,
"The alias of a predicate used to determine
whether to apply this transformation.")
- .define(PredicatedTransformation.NEGATE_CONFIG,
Type.BOOLEAN, false, Importance.MEDIUM,
+ .define(TransformationStage.NEGATE_CONFIG,
Type.BOOLEAN, false, Importance.MEDIUM,
"Whether the configured predicate should be
negated.");
}
@@ -332,8 +334,8 @@ public class ConnectorConfig extends AbstractConfig {
return super.configDefsForClass(typeConfig)
.filter(entry -> {
// The implicit parameters mask any from the
transformer with the same name
- if
(PredicatedTransformation.PREDICATE_CONFIG.equals(entry.getKey())
- ||
PredicatedTransformation.NEGATE_CONFIG.equals(entry.getKey())) {
+ if
(TransformationStage.PREDICATE_CONFIG.equals(entry.getKey())
+ ||
TransformationStage.NEGATE_CONFIG.equals(entry.getKey())) {
log.warn("Transformer config {} is masked by
implicit config of that name",
entry.getKey());
return false;
@@ -350,8 +352,8 @@ public class ConnectorConfig extends AbstractConfig {
@Override
protected void validateProps(String prefix) {
- String prefixedNegate = prefix +
PredicatedTransformation.NEGATE_CONFIG;
- String prefixedPredicate = prefix +
PredicatedTransformation.PREDICATE_CONFIG;
+ String prefixedNegate = prefix +
TransformationStage.NEGATE_CONFIG;
+ String prefixedPredicate = prefix +
TransformationStage.PREDICATE_CONFIG;
if (props.containsKey(prefixedNegate) &&
!props.containsKey(prefixedPredicate)) {
throw new ConfigException("Config '" + prefixedNegate + "'
was provided " +
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationChain.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationChain.java
index 984c1422572..b130a226f5f 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationChain.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationChain.java
@@ -19,7 +19,6 @@ package org.apache.kafka.connect.runtime;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator;
import org.apache.kafka.connect.runtime.errors.Stage;
-import org.apache.kafka.connect.transforms.Transformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -34,24 +33,24 @@ import java.util.StringJoiner;
public class TransformationChain<R extends ConnectRecord<R>> implements
AutoCloseable {
private static final Logger log =
LoggerFactory.getLogger(TransformationChain.class);
- private final List<Transformation<R>> transformations;
+ private final List<TransformationStage<R>> transformationStages;
private final RetryWithToleranceOperator retryWithToleranceOperator;
- public TransformationChain(List<Transformation<R>> transformations,
RetryWithToleranceOperator retryWithToleranceOperator) {
- this.transformations = transformations;
+ public TransformationChain(List<TransformationStage<R>>
transformationStages, RetryWithToleranceOperator retryWithToleranceOperator) {
+ this.transformationStages = transformationStages;
this.retryWithToleranceOperator = retryWithToleranceOperator;
}
public R apply(R record) {
- if (transformations.isEmpty()) return record;
+ if (transformationStages.isEmpty()) return record;
- for (final Transformation<R> transformation : transformations) {
+ for (final TransformationStage<R> transformationStage :
transformationStages) {
final R current = record;
log.trace("Applying transformation {} to {}",
- transformation.getClass().getName(), record);
+ transformationStage.transformClass().getName(), record);
// execute the operation
- record = retryWithToleranceOperator.execute(() ->
transformation.apply(current), Stage.TRANSFORMATION, transformation.getClass());
+ record = retryWithToleranceOperator.execute(() ->
transformationStage.apply(current), Stage.TRANSFORMATION,
transformationStage.transformClass());
if (record == null) break;
}
@@ -61,8 +60,8 @@ public class TransformationChain<R extends ConnectRecord<R>>
implements AutoClos
@Override
public void close() {
- for (Transformation<R> transformation : transformations) {
- transformation.close();
+ for (TransformationStage<R> transformationStage :
transformationStages) {
+ transformationStage.close();
}
}
@@ -71,18 +70,18 @@ public class TransformationChain<R extends
ConnectRecord<R>> implements AutoClos
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
TransformationChain<?> that = (TransformationChain<?>) o;
- return Objects.equals(transformations, that.transformations);
+ return Objects.equals(transformationStages, that.transformationStages);
}
@Override
public int hashCode() {
- return Objects.hash(transformations);
+ return Objects.hash(transformationStages);
}
public String toString() {
StringJoiner chain = new StringJoiner(", ", getClass().getName() +
"{", "}");
- for (Transformation<R> transformation : transformations) {
- chain.add(transformation.getClass().getName());
+ for (TransformationStage<R> transformationStage :
transformationStages) {
+ chain.add(transformationStage.transformClass().getName());
}
return chain.toString();
}
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/PredicatedTransformation.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationStage.java
similarity index 56%
rename from
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/PredicatedTransformation.java
rename to
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationStage.java
index 446db5b2f32..3831730ad8f 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/PredicatedTransformation.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationStage.java
@@ -16,65 +16,60 @@
*/
package org.apache.kafka.connect.runtime;
-import java.util.Map;
-import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.connector.ConnectRecord;
-import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.predicates.Predicate;
/**
- * Decorator for a {@link Transformation} which applies the delegate only when
a
- * {@link Predicate} is true (or false, according to {@code negate}).
+ * Wrapper for a {@link Transformation} and corresponding optional {@link
Predicate}
+ * which applies the transformation when the {@link Predicate} is true (or
false, according to {@code negate}).
+ * If no {@link Predicate} is provided, the transformation will be
unconditionally applied.
* @param <R> The type of record (must be an implementation of {@link
ConnectRecord})
*/
-public class PredicatedTransformation<R extends ConnectRecord<R>> implements
Transformation<R> {
+public class TransformationStage<R extends ConnectRecord<R>> implements
AutoCloseable {
static final String PREDICATE_CONFIG = "predicate";
static final String NEGATE_CONFIG = "negate";
- final Predicate<R> predicate;
- final Transformation<R> delegate;
- final boolean negate;
+ private final Predicate<R> predicate;
+ private final Transformation<R> transformation;
+ private final boolean negate;
- PredicatedTransformation(Predicate<R> predicate, boolean negate,
Transformation<R> delegate) {
+ TransformationStage(Transformation<R> transformation) {
+ this(null, false, transformation);
+ }
+
+ TransformationStage(Predicate<R> predicate, boolean negate,
Transformation<R> transformation) {
this.predicate = predicate;
this.negate = negate;
- this.delegate = delegate;
+ this.transformation = transformation;
}
- @Override
- public void configure(Map<String, ?> configs) {
- throw new ConnectException(PredicatedTransformation.class.getName() +
".configure() " +
- "should never be called directly.");
+ public Class<? extends Transformation<R>> transformClass() {
+ @SuppressWarnings("unchecked")
+ Class<? extends Transformation<R>> transformClass = (Class<? extends
Transformation<R>>) transformation.getClass();
+ return transformClass;
}
- @Override
public R apply(R record) {
- if (negate ^ predicate.test(record)) {
- return delegate.apply(record);
+ if (predicate == null || negate ^ predicate.test(record)) {
+ return transformation.apply(record);
}
return record;
}
- @Override
- public ConfigDef config() {
- throw new ConnectException(PredicatedTransformation.class.getName() +
".config() " +
- "should never be called directly.");
- }
-
@Override
public void close() {
- Utils.closeQuietly(delegate, "predicated transformation");
+ Utils.closeQuietly(transformation, "transformation");
Utils.closeQuietly(predicate, "predicate");
}
@Override
public String toString() {
- return "PredicatedTransformation{" +
+ return "TransformationStage{" +
"predicate=" + predicate +
- ", delegate=" + delegate +
+ ", transformation=" + transformation +
", negate=" + negate +
'}';
}
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index f966ea4cb55..8f9f727e25e 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -1253,7 +1253,7 @@ public class Worker {
Class<? extends Connector> connectorClass,
RetryWithToleranceOperator
retryWithToleranceOperator) {
- TransformationChain<SinkRecord> transformationChain = new
TransformationChain<>(connectorConfig.<SinkRecord>transformations(),
retryWithToleranceOperator);
+ TransformationChain<SinkRecord> transformationChain = new
TransformationChain<>(connectorConfig.<SinkRecord>transformationStages(),
retryWithToleranceOperator);
log.info("Initializing: {}", transformationChain);
SinkConnectorConfig sinkConfig = new SinkConnectorConfig(plugins,
connectorConfig.originalsStrings());
retryWithToleranceOperator.reporters(sinkTaskReporters(id,
sinkConfig, errorHandlingMetrics, connectorClass));
@@ -1297,7 +1297,7 @@ public class Worker {
SourceConnectorConfig sourceConfig = new
SourceConnectorConfig(plugins,
connectorConfig.originalsStrings(),
config.topicCreationEnable());
retryWithToleranceOperator.reporters(sourceTaskReporters(id,
sourceConfig, errorHandlingMetrics));
- TransformationChain<SourceRecord> transformationChain = new
TransformationChain<>(sourceConfig.<SourceRecord>transformations(),
retryWithToleranceOperator);
+ TransformationChain<SourceRecord> transformationChain = new
TransformationChain<>(sourceConfig.<SourceRecord>transformationStages(),
retryWithToleranceOperator);
log.info("Initializing: {}", transformationChain);
Map<String, Object> producerProps =
baseProducerConfigs(id.connector(), "connector-producer-" + id, config,
sourceConfig, connectorClass,
@@ -1365,7 +1365,7 @@ public class Worker {
SourceConnectorConfig sourceConfig = new
SourceConnectorConfig(plugins,
connectorConfig.originalsStrings(),
config.topicCreationEnable());
retryWithToleranceOperator.reporters(sourceTaskReporters(id,
sourceConfig, errorHandlingMetrics));
- TransformationChain<SourceRecord> transformationChain = new
TransformationChain<>(sourceConfig.<SourceRecord>transformations(),
retryWithToleranceOperator);
+ TransformationChain<SourceRecord> transformationChain = new
TransformationChain<>(sourceConfig.<SourceRecord>transformationStages(),
retryWithToleranceOperator);
log.info("Initializing: {}", transformationChain);
Map<String, Object> producerProps =
exactlyOnceSourceTaskProducerConfigs(
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
index 05b8375183c..ad8ff00cb96 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
@@ -20,7 +20,6 @@ import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.Herder;
-import org.apache.kafka.connect.runtime.PredicatedTransformation;
import org.apache.kafka.connect.runtime.isolation.PluginDesc;
import org.apache.kafka.connect.runtime.isolation.PluginType;
import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
@@ -34,7 +33,6 @@ import org.apache.kafka.connect.tools.MockSourceConnector;
import org.apache.kafka.connect.tools.SchemaSourceConnector;
import org.apache.kafka.connect.tools.VerifiableSinkConnector;
import org.apache.kafka.connect.tools.VerifiableSourceConnector;
-import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.util.FutureCallback;
import javax.ws.rs.BadRequestException;
@@ -79,11 +77,6 @@ public class ConnectorPluginsResource implements
ConnectResource {
SchemaSourceConnector.class
);
- @SuppressWarnings({"unchecked", "rawtypes"})
- static final List<Class<? extends Transformation<?>>> TRANSFORM_EXCLUDES =
Collections.singletonList(
- (Class) PredicatedTransformation.class
- );
-
public ConnectorPluginsResource(Herder herder) {
this.herder = herder;
this.connectorPlugins = new ArrayList<>();
@@ -92,7 +85,7 @@ public class ConnectorPluginsResource implements
ConnectResource {
// TODO: improve once plugins are allowed to be added/removed during
runtime.
addConnectorPlugins(herder.plugins().sinkConnectors(),
SINK_CONNECTOR_EXCLUDES);
addConnectorPlugins(herder.plugins().sourceConnectors(),
SOURCE_CONNECTOR_EXCLUDES);
- addConnectorPlugins(herder.plugins().transformations(),
TRANSFORM_EXCLUDES);
+ addConnectorPlugins(herder.plugins().transformations(),
Collections.emptySet());
addConnectorPlugins(herder.plugins().predicates(),
Collections.emptySet());
addConnectorPlugins(herder.plugins().converters(),
Collections.emptySet());
addConnectorPlugins(herder.plugins().headerConverters(),
Collections.emptySet());
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
index 4abdbeaa4e5..d8c071e6c2f 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.connector.Connector;
import org.apache.kafka.connect.runtime.isolation.PluginDesc;
import org.apache.kafka.connect.runtime.isolation.Plugins;
+import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.predicates.Predicate;
import org.junit.Test;
@@ -48,6 +49,8 @@ public class ConnectorConfigTest<R extends ConnectRecord<R>> {
}
};
+ private static final SinkRecord DUMMY_RECORD = new SinkRecord(null, 0,
null, null, null, null, 0L);
+
public static abstract class TestConnector extends Connector {
}
@@ -62,7 +65,7 @@ public class ConnectorConfigTest<R extends ConnectRecord<R>> {
@Override
public R apply(R record) {
- return null;
+ return record.newRecord(null, magicNumber, null, null, null, null,
0L);
}
@Override
@@ -147,10 +150,11 @@ public class ConnectorConfigTest<R extends
ConnectRecord<R>> {
props.put("transforms.a.type", SimpleTransformation.class.getName());
props.put("transforms.a.magic.number", "42");
final ConnectorConfig config = new ConnectorConfig(MOCK_PLUGINS,
props);
- final List<Transformation<R>> transformations =
config.transformations();
- assertEquals(1, transformations.size());
- final SimpleTransformation<R> xform = (SimpleTransformation<R>)
transformations.get(0);
- assertEquals(42, xform.magicNumber);
+ final List<TransformationStage<SinkRecord>> transformationStages =
config.transformationStages();
+ assertEquals(1, transformationStages.size());
+ final TransformationStage<SinkRecord> stage =
transformationStages.get(0);
+ assertEquals(SimpleTransformation.class, stage.transformClass());
+ assertEquals(42,
stage.apply(DUMMY_RECORD).kafkaPartition().intValue());
}
@Test
@@ -175,10 +179,10 @@ public class ConnectorConfigTest<R extends
ConnectRecord<R>> {
props.put("transforms.b.type", SimpleTransformation.class.getName());
props.put("transforms.b.magic.number", "84");
final ConnectorConfig config = new ConnectorConfig(MOCK_PLUGINS,
props);
- final List<Transformation<R>> transformations =
config.transformations();
- assertEquals(2, transformations.size());
- assertEquals(42, ((SimpleTransformation<R>)
transformations.get(0)).magicNumber);
- assertEquals(84, ((SimpleTransformation<R>)
transformations.get(1)).magicNumber);
+ final List<TransformationStage<SinkRecord>> transformationStages =
config.transformationStages();
+ assertEquals(2, transformationStages.size());
+ assertEquals(42,
transformationStages.get(0).apply(DUMMY_RECORD).kafkaPartition().intValue());
+ assertEquals(84,
transformationStages.get(1).apply(DUMMY_RECORD).kafkaPartition().intValue());
}
@Test
@@ -246,7 +250,7 @@ public class ConnectorConfigTest<R extends
ConnectRecord<R>> {
props.put("predicates", "my-pred");
props.put("predicates.my-pred.type", TestPredicate.class.getName());
props.put("predicates.my-pred.int", "84");
- assertPredicatedTransform(props, true);
+ assertTransformationStageWithPredicate(props, true);
}
@Test
@@ -261,7 +265,7 @@ public class ConnectorConfigTest<R extends
ConnectRecord<R>> {
props.put("predicates", "my-pred");
props.put("predicates.my-pred.type", TestPredicate.class.getName());
props.put("predicates.my-pred.int", "84");
- assertPredicatedTransform(props, false);
+ assertTransformationStageWithPredicate(props, false);
}
@Test
@@ -280,25 +284,19 @@ public class ConnectorConfigTest<R extends
ConnectRecord<R>> {
assertTrue(e.getMessage().contains("Predicate is abstract and cannot
be created"));
}
- private void assertPredicatedTransform(Map<String, String> props, boolean
expectedNegated) {
+ private void assertTransformationStageWithPredicate(Map<String, String>
props, boolean expectedNegated) {
final ConnectorConfig config = new ConnectorConfig(MOCK_PLUGINS,
props);
- final List<Transformation<R>> transformations =
config.transformations();
- assertEquals(1, transformations.size());
- assertTrue(transformations.get(0) instanceof PredicatedTransformation);
- PredicatedTransformation<?> predicated = (PredicatedTransformation<?>)
transformations.get(0);
-
- assertEquals(expectedNegated, predicated.negate);
-
- assertTrue(predicated.delegate instanceof
ConnectorConfigTest.SimpleTransformation);
- assertEquals(42, ((SimpleTransformation<?>)
predicated.delegate).magicNumber);
+ final List<TransformationStage<SinkRecord>> transformationStages =
config.transformationStages();
+ assertEquals(1, transformationStages.size());
+ TransformationStage<SinkRecord> stage = transformationStages.get(0);
- assertTrue(predicated.predicate instanceof
ConnectorConfigTest.TestPredicate);
- assertEquals(84, ((TestPredicate<?>) predicated.predicate).param);
+ assertEquals(expectedNegated ? 42 : 0,
stage.apply(DUMMY_RECORD).kafkaPartition().intValue());
- predicated.close();
+ SinkRecord matchingRecord = DUMMY_RECORD.newRecord(null, 84, null,
null, null, null, 0L);
+ assertEquals(expectedNegated ? 84 : 42,
stage.apply(matchingRecord).kafkaPartition().intValue());
+ assertEquals(SimpleTransformation.class, stage.transformClass());
- assertEquals(0, ((SimpleTransformation<?>)
predicated.delegate).magicNumber);
- assertEquals(0, ((TestPredicate<?>) predicated.predicate).param);
+ stage.close();
}
@Test
@@ -381,7 +379,7 @@ public class ConnectorConfigTest<R extends
ConnectRecord<R>> {
@Override
public boolean test(R record) {
- return false;
+ return record.kafkaPartition() == param;
}
@Override
@@ -445,8 +443,8 @@ public class ConnectorConfigTest<R extends
ConnectRecord<R>> {
props.put(prefix + "type",
HasDuplicateConfigTransformation.class.getName());
ConfigDef def = ConnectorConfig.enrich(MOCK_PLUGINS, new ConfigDef(),
props, false);
assertEnrichedConfigDef(def, prefix,
HasDuplicateConfigTransformation.MUST_EXIST_KEY, ConfigDef.Type.BOOLEAN);
- assertEnrichedConfigDef(def, prefix,
PredicatedTransformation.PREDICATE_CONFIG, ConfigDef.Type.STRING);
- assertEnrichedConfigDef(def, prefix,
PredicatedTransformation.NEGATE_CONFIG, ConfigDef.Type.BOOLEAN);
+ assertEnrichedConfigDef(def, prefix,
TransformationStage.PREDICATE_CONFIG, ConfigDef.Type.STRING);
+ assertEnrichedConfigDef(def, prefix,
TransformationStage.NEGATE_CONFIG, ConfigDef.Type.BOOLEAN);
}
private static void assertEnrichedConfigDef(ConfigDef def, String prefix,
String keyName, ConfigDef.Type expectedType) {
@@ -460,9 +458,9 @@ public class ConnectorConfigTest<R extends
ConnectRecord<R>> {
private static final String MUST_EXIST_KEY = "must.exist.key";
private static final ConfigDef CONFIG_DEF = new ConfigDef()
// this configDef is duplicate. It should be removed
automatically so as to avoid duplicate config error.
- .define(PredicatedTransformation.PREDICATE_CONFIG,
ConfigDef.Type.INT, ConfigDef.NO_DEFAULT_VALUE, ConfigDef.Importance.MEDIUM,
"fake")
+ .define(TransformationStage.PREDICATE_CONFIG,
ConfigDef.Type.INT, ConfigDef.NO_DEFAULT_VALUE, ConfigDef.Importance.MEDIUM,
"fake")
// this configDef is duplicate. It should be removed
automatically so as to avoid duplicate config error.
- .define(PredicatedTransformation.NEGATE_CONFIG,
ConfigDef.Type.INT, 123, ConfigDef.Importance.MEDIUM, "fake")
+ .define(TransformationStage.NEGATE_CONFIG, ConfigDef.Type.INT,
123, ConfigDef.Importance.MEDIUM, "fake")
// this configDef should appear if above duplicate configDef
is removed without any error
.define(MUST_EXIST_KEY, ConfigDef.Type.BOOLEAN, true,
ConfigDef.Importance.MEDIUM, "this key must exist");
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
index 4de13128dbc..3a0090f2267 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java
@@ -502,7 +502,7 @@ public class ErrorHandlingTaskTest {
converter.configure(oo);
TransformationChain<SinkRecord> sinkTransforms =
- new TransformationChain<>(singletonList(new
FaultyPassthrough<SinkRecord>()), retryWithToleranceOperator);
+ new TransformationChain<>(singletonList(new
TransformationStage<>(new FaultyPassthrough<SinkRecord>())),
retryWithToleranceOperator);
workerSinkTask = new WorkerSinkTask(
taskId, sinkTask, statusListener, initialState, workerConfig,
@@ -532,7 +532,7 @@ public class ErrorHandlingTaskTest {
}
private void createSourceTask(TargetState initialState,
RetryWithToleranceOperator retryWithToleranceOperator, Converter converter) {
- TransformationChain<SourceRecord> sourceTransforms = new
TransformationChain<>(singletonList(new FaultyPassthrough<SourceRecord>()),
retryWithToleranceOperator);
+ TransformationChain<SourceRecord> sourceTransforms = new
TransformationChain<>(singletonList(new TransformationStage<>(new
FaultyPassthrough<SourceRecord>())), retryWithToleranceOperator);
workerSourceTask = spy(new WorkerSourceTask(
taskId, sourceTask, statusListener, initialState, converter,
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/PredicatedTransformationTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/TransformationStageTest.java
similarity index 63%
rename from
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/PredicatedTransformationTest.java
rename to
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/TransformationStageTest.java
index 8bce328817a..d31e8563f8c 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/PredicatedTransformationTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/TransformationStageTest.java
@@ -17,13 +17,18 @@
package org.apache.kafka.connect.runtime;
import org.apache.kafka.connect.source.SourceRecord;
+import org.apache.kafka.connect.transforms.Transformation;
+import org.apache.kafka.connect.transforms.predicates.Predicate;
import org.junit.Test;
import static java.util.Collections.singletonMap;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
-public class PredicatedTransformationTest {
+public class TransformationStageTest {
private final SourceRecord initial = new
SourceRecord(singletonMap("initial", 1), null, null, null, null);
private final SourceRecord transformed = new
SourceRecord(singletonMap("transformed", 2), null, null, null, null);
@@ -39,17 +44,21 @@ public class PredicatedTransformationTest {
private void applyAndAssert(boolean predicateResult, boolean negate,
SourceRecord expectedResult) {
- SamplePredicate predicate = new SamplePredicate(predicateResult);
- SampleTransformation<SourceRecord> predicatedTransform = new
SampleTransformation<>(transformed);
- PredicatedTransformation<SourceRecord> pt = new
PredicatedTransformation<>(
+ @SuppressWarnings("unchecked")
+ Predicate<SourceRecord> predicate = mock(Predicate.class);
+ when(predicate.test(any())).thenReturn(predicateResult);
+ @SuppressWarnings("unchecked")
+ Transformation<SourceRecord> transformation =
mock(Transformation.class);
+ when(transformation.apply(any())).thenReturn(transformed);
+ TransformationStage<SourceRecord> stage = new TransformationStage<>(
predicate,
negate,
- predicatedTransform);
+ transformation);
- assertEquals(expectedResult, pt.apply(initial));
+ assertEquals(expectedResult, stage.apply(initial));
- pt.close();
- assertTrue(predicate.closed);
- assertTrue(predicatedTransform.closed);
+ stage.close();
+ verify(predicate).close();
+ verify(transformation).close();
}
}
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
index 59cf83ca9ae..63e7c27c92b 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
@@ -387,8 +387,7 @@ public class ConnectorPluginsResourceTest {
public void testListAllPlugins() {
Set<Class<?>> excludes = Stream.of(
ConnectorPluginsResource.SINK_CONNECTOR_EXCLUDES,
- ConnectorPluginsResource.SOURCE_CONNECTOR_EXCLUDES,
- ConnectorPluginsResource.TRANSFORM_EXCLUDES
+ ConnectorPluginsResource.SOURCE_CONNECTOR_EXCLUDES
).flatMap(Collection::stream)
.collect(Collectors.toSet());
Set<PluginInfo> expectedConnectorPlugins = Stream.of(
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicCreationTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicCreationTest.java
index feb0e5f5ac9..af11782a041 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicCreationTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/util/TopicCreationTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.connect.util;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.runtime.TransformationStage;
import org.apache.kafka.connect.runtime.SourceConnectorConfig;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
@@ -26,7 +27,6 @@ import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.transforms.Cast;
import org.apache.kafka.connect.transforms.RegexRouter;
-import org.apache.kafka.connect.transforms.Transformation;
import org.junit.Before;
import org.junit.Test;
@@ -516,9 +516,9 @@ public class TopicCreationTest {
topicCreation.addTopic(FOO_TOPIC);
assertFalse(topicCreation.isTopicCreationRequired(FOO_TOPIC));
- List<Transformation<SourceRecord>> transformations =
sourceConfig.transformations();
- assertEquals(1, transformations.size());
- Cast<SourceRecord> xform = (Cast<SourceRecord>) transformations.get(0);
+ List<TransformationStage<SourceRecord>> transformationStages =
sourceConfig.transformationStages();
+ assertEquals(1, transformationStages.size());
+ TransformationStage<SourceRecord> xform = transformationStages.get(0);
SourceRecord transformed = xform.apply(new SourceRecord(null, null,
"topic", 0, null, null, Schema.INT8_SCHEMA, 42));
assertEquals(Schema.Type.INT8, transformed.valueSchema().type());
assertEquals((byte) 42, transformed.value());
@@ -623,15 +623,15 @@ public class TopicCreationTest {
assertEquals(barPartitions, barTopicSpec.numPartitions());
assertThat(barTopicSpec.configs(), is(barTopicProps));
- List<Transformation<SourceRecord>> transformations =
sourceConfig.transformations();
- assertEquals(2, transformations.size());
+ List<TransformationStage<SourceRecord>> transformationStages =
sourceConfig.transformationStages();
+ assertEquals(2, transformationStages.size());
- Cast<SourceRecord> castXForm = (Cast<SourceRecord>)
transformations.get(0);
+ TransformationStage<SourceRecord> castXForm =
transformationStages.get(0);
SourceRecord transformed = castXForm.apply(new SourceRecord(null,
null, "topic", 0, null, null, Schema.INT8_SCHEMA, 42));
assertEquals(Schema.Type.INT8, transformed.valueSchema().type());
assertEquals((byte) 42, transformed.value());
- RegexRouter<SourceRecord> regexRouterXForm =
(RegexRouter<SourceRecord>) transformations.get(1);
+ TransformationStage<SourceRecord> regexRouterXForm =
transformationStages.get(1);
transformed = regexRouterXForm.apply(new SourceRecord(null, null,
"topic", 0, null, null, Schema.INT8_SCHEMA, 42));
assertEquals("prefix-topic", transformed.topic());
}