This is an automated email from the ASF dual-hosted git repository. ewencp pushed a commit to branch 2.1 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.1 by this push: new 45b3971 MINOR: Add logging to Connect SMTs 45b3971 is described below commit 45b39710d8bfccf945f6aa1e392704ef6008339d Author: Cyrus Vafadari <cy...@confluent.io> AuthorDate: Thu Nov 29 22:29:50 2018 -0800 MINOR: Add logging to Connect SMTs Includes Update to ConnectRecord string representation to give visibility into schemas, useful in SMT tracing Author: Cyrus Vafadari <cy...@confluent.io> Reviewers: Randall Hauch <rha...@gmail.com>, Konstantine Karantasis <konstant...@confluent.io>, Ewen Cheslack-Postava <e...@confluent.io> Closes #5860 from cyrusv/cyrus-logging (cherry picked from commit 4712a3641619e86b8e6d901355088f6ae06e9f37) Signed-off-by: Ewen Cheslack-Postava <m...@ewencp.org> --- .../org/apache/kafka/connect/connector/ConnectRecord.java | 2 ++ .../apache/kafka/connect/runtime/TransformationChain.java | 13 +++++++++++++ .../main/java/org/apache/kafka/connect/runtime/Worker.java | 2 ++ .../main/java/org/apache/kafka/connect/transforms/Cast.java | 4 ++++ .../apache/kafka/connect/transforms/SetSchemaMetadata.java | 7 ++++++- 5 files changed, 27 insertions(+), 1 deletion(-) diff --git a/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectRecord.java b/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectRecord.java index 2c5f514..55272c2 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectRecord.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectRecord.java @@ -140,7 +140,9 @@ public abstract class ConnectRecord<R extends ConnectRecord<R>> { "topic='" + topic + '\'' + ", kafkaPartition=" + kafkaPartition + ", key=" + key + + ", keySchema=" + keySchema.toString() + ", value=" + value + + ", valueSchema=" + valueSchema.toString() + ", timestamp=" + timestamp + ", headers=" + headers + '}'; diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationChain.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationChain.java index 3680905..a077a01 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationChain.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationChain.java @@ -20,11 +20,15 @@ import org.apache.kafka.connect.connector.ConnectRecord; import org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator; import org.apache.kafka.connect.runtime.errors.Stage; import org.apache.kafka.connect.transforms.Transformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.List; import java.util.Objects; +import java.util.StringJoiner; public class TransformationChain<R extends ConnectRecord<R>> { + private static final Logger log = LoggerFactory.getLogger(TransformationChain.class); private final List<Transformation<R>> transformations; private final RetryWithToleranceOperator retryWithToleranceOperator; @@ -40,6 +44,8 @@ public class TransformationChain<R extends ConnectRecord<R>> { for (final Transformation<R> transformation : transformations) { final R current = record; + log.trace("Applying transformation {} to {}", + transformation.getClass().getName(), record); // execute the operation record = retryWithToleranceOperator.execute(() -> transformation.apply(current), Stage.TRANSFORMATION, transformation.getClass()); @@ -68,4 +74,11 @@ public class TransformationChain<R extends ConnectRecord<R>> { return Objects.hash(transformations); } + public String toString() { + StringJoiner chain = new StringJoiner(", ", getClass().getName() + "{", "}"); + for (Transformation<R> transformation : transformations) { + chain.add(transformation.getClass().getName()); + } + return chain.toString(); + } } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java index df73a43..1fd91d3 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java @@ -493,6 +493,7 @@ public class Worker { if (task instanceof SourceTask) { retryWithToleranceOperator.reporters(sourceTaskReporters(id, connConfig, errorHandlingMetrics)); TransformationChain<SourceRecord> transformationChain = new TransformationChain<>(connConfig.<SourceRecord>transformations(), retryWithToleranceOperator); + log.info("Initializing: {}", transformationChain); OffsetStorageReader offsetReader = new OffsetStorageReaderImpl(offsetBackingStore, id.connector(), internalKeyConverter, internalValueConverter); OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetBackingStore, id.connector(), @@ -505,6 +506,7 @@ public class Worker { time, retryWithToleranceOperator); } else if (task instanceof SinkTask) { TransformationChain<SinkRecord> transformationChain = new TransformationChain<>(connConfig.<SinkRecord>transformations(), retryWithToleranceOperator); + log.info("Initializing: {}", transformationChain); SinkConnectorConfig sinkConfig = new SinkConnectorConfig(plugins, connConfig.originalsStrings()); retryWithToleranceOperator.reporters(sinkTaskReporters(id, sinkConfig, errorHandlingMetrics)); return new WorkerSinkTask(id, (SinkTask) task, statusListener, initialState, config, configState, metrics, keyConverter, diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java index 07ccd37..3dc6dc7 100644 --- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Cast.java @@ -32,6 +32,8 @@ import org.apache.kafka.connect.data.Values; import org.apache.kafka.connect.errors.DataException; import org.apache.kafka.connect.transforms.util.SchemaUtil; import org.apache.kafka.connect.transforms.util.SimpleConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.EnumSet; import java.util.HashMap; @@ -44,6 +46,7 @@ import static org.apache.kafka.connect.transforms.util.Requirements.requireMap; import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct; public abstract class Cast<R extends ConnectRecord<R>> implements Transformation<R> { + private static final Logger log = LoggerFactory.getLogger(Cast.class); // TODO: Currently we only support top-level field casting. Ideally we could use a dotted notation in the spec to // allow casting nested fields. @@ -156,6 +159,7 @@ public abstract class Cast<R extends ConnectRecord<R>> implements Transformation final Object origFieldValue = value.get(field); final Schema.Type targetType = casts.get(field.name()); final Object newFieldValue = targetType != null ? castValueToType(field.schema(), origFieldValue, targetType) : origFieldValue; + log.trace("Cast field '{}' from '{}' to '{}'", field.name(), origFieldValue, newFieldValue); updatedValue.put(updatedSchema.field(field.name()), newFieldValue); } return newRecord(record, updatedSchema, updatedValue); diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java index 901ac9f..fd3cbf3 100644 --- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java @@ -24,12 +24,15 @@ import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.transforms.util.SimpleConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Map; import static org.apache.kafka.connect.transforms.util.Requirements.requireSchema; public abstract class SetSchemaMetadata<R extends ConnectRecord<R>> implements Transformation<R> { + private static final Logger log = LoggerFactory.getLogger(SetSchemaMetadata.class); public static final String OVERVIEW_DOC = "Set the schema name, version or both on the record's key (<code>" + Key.class.getName() + "</code>)" @@ -76,6 +79,8 @@ public abstract class SetSchemaMetadata<R extends ConnectRecord<R>> implements T isMap ? schema.keySchema() : null, isMap || isArray ? schema.valueSchema() : null ); + log.trace("Applying SetSchemaMetadata SMT. Original schema: {}, updated schema: {}", + schema, updatedSchema); return newRecord(record, updatedSchema); } @@ -149,4 +154,4 @@ public abstract class SetSchemaMetadata<R extends ConnectRecord<R>> implements T } return keyOrValue; } -} \ No newline at end of file +}