lhotari commented on code in PR #16179:
URL: https://github.com/apache/pulsar/pull/16179#discussion_r2774592647
##########
pulsar-io/cassandra/src/main/java/org/apache/pulsar/io/cassandra/CassandraStringSink.java:
##########
@@ -19,23 +19,13 @@
package org.apache.pulsar.io.cassandra;
import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.io.core.KeyValue;
-import org.apache.pulsar.io.core.annotations.Connector;
-import org.apache.pulsar.io.core.annotations.IOType;
+import org.apache.pulsar.io.cassandra.util.RecordWrapper;
+import org.apache.pulsar.io.cassandra.util.StringRecordWrapper;
+
+public class CassandraStringSink extends CassandraSink<String> {
-/**
- * Cassandra sink that treats incoming messages on the input topic as Strings
- * and write identical key/value pairs.
- */
-@Connector(
- name = "cassandra",
- type = IOType.SINK,
- help = "The CassandraStringSink is used for moving messages from Pulsar to
Cassandra.",
- configClass = CassandraSinkConfig.class)
-public class CassandraStringSink extends CassandraAbstractSink<String, String>
{
@Override
- public KeyValue<String, String> extractKeyValue(Record<byte[]> record) {
- String key = record.getKey().orElseGet(() -> new
String(record.getValue()));
- return new KeyValue<>(key, new String(record.getValue()));
+ RecordWrapper<String> wrapRecord(Record<String> record) {
+ return new StringRecordWrapper(record.getValue());
}
-}
Review Comment:
The behavior of the existing CassandraStringSink shouldn't be modified since
it breaks backwards compatibility.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]