dlg99 opened a new pull request #9927:
URL: https://github.com/apache/pulsar/pull/9927
### Motivation
Provide a way to use Kafka-Connect Sink as a Pulsar Sink, in cases like:
- company has custom kafka sink and want to try the pulsar out
- no corresponding pulsar sink exists
etc.
### Modifications
Added KafkaConnectSink. Kafka Schema is autodetected from key/value itself
for primitive values with option to unwrap KeyValue. GenericRecord support is
TBD, depends on the changes Enrico is working on.
Added profile to pulsar-io/kafka-connect-adaptor to build nar with
kafka-connect connector included
### Verifying this change
- [ ] Make sure that the change passes the CI checks.
This change added tests and can be verified as follows:
Added unit tests.
Tested locally as
Ran pulsar standalone `bin/pulsar standalone`
Built nar as `mvn -f pulsar-io/kafka-connect-adaptor-nar/pom.xml clean
package -DskipTests -P packageKafkaConnect` to include kafka's connect-file
sink into the nar.
Ran test nar as
```
bin/pulsar-admin sinks localrun -a
./pulsar-io/kafka-connect-adaptor-nar/target/pulsar-io-kafka-connect-adaptor-nar-2.8.0-SNAPSHOT.nar
--name kwrap --namespace public/default/ktest --parallelism 1 -i my-topic
--sink-config-file ~/sink.yaml
```
with
```
$ cat ~/sink.yaml
configs:
"topic": "test"
"offsetStorageTopic": "kafka-connect-sink-offset"
"pulsarServiceUrl": "pulsar://localhost:6650/"
"kafkaConnectorSinkClass":
"org.apache.kafka.connect.file.FileStreamSinkConnector"
"defaultKeySchema": "STRING_SCHEMA"
"defaultValueSchema": "BYTES_SCHEMA"
"kafkaConnectorConfigProperties":
"file": "/tmp/sink_test.out"
```
set topic schema as
```
bin/pulsar-admin schemas upload --filename ~/schema.json my-topic
$ cat ~/schema.json
{
"type": "STRING",
"schema": "",
"properties": {
"key1": "value1"
}
}
```
message produced as
```
bin/pulsar-client produce my-topic --messages "hello-pulsar"
```
and got
```
$ cat /tmp/sink_test.out
hello-pulsar
```
### Does this pull request potentially affect one of the following parts:
*If `yes` was chosen, please highlight the changes*
- Dependencies (does it add or upgrade a dependency): (no)
- The public API: (no)
- The schema: (no)
- The default values of configurations: (no)
- The wire protocol: (no)
- The rest endpoints: (no)
- The admin cli options: (no)
- Anything that affects deployment: (no)
### Documentation
- Does this pull request introduce a new feature? (yes)
- If yes, how is the feature documented? (not documented yet)
- If a feature is not documented yet in this PR, please create a followup
issue for adding the documentation
TBD following review
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]