sijie commented on a change in pull request #9825:
URL: https://github.com/apache/pulsar/pull/9825#discussion_r589145567



##########
File path: distribution/io/src/assemble/io.xml
##########
@@ -47,7 +47,7 @@
 
     
<file><source>${basedir}/../../pulsar-io/cassandra/target/pulsar-io-cassandra-${project.version}.nar</source></file>
     
<file><source>${basedir}/../../pulsar-io/twitter/target/pulsar-io-twitter-${project.version}.nar</source></file>
-    
<file><source>${basedir}/../../pulsar-io/kafka/target/pulsar-io-kafka-${project.version}.nar</source></file>

Review comment:
       I think this is a breaking change, no?

##########
File path: 
pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaBytesSink.java
##########
@@ -54,4 +55,11 @@ protected Properties beforeCreateProducer(Properties props) {
     public KeyValue<String, byte[]> extractKeyValue(Record<byte[]> record) {
         return new KeyValue<>(record.getKey().orElse(null), record.getValue());
     }
+
+    @Override
+    public KeyValue<Schema, Schema> extractKeyValueSchemas(Record<byte[]> 
record) {
+        Schema keySchema = Schema.STRING_SCHEMA;

Review comment:
       any reason why do you use `STRING_SCHEMA`, not `BYTES_SCHEMA`?

##########
File path: 
pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSink.java
##########
@@ -78,32 +89,80 @@ protected Properties beforeCreateProducer(Properties props) 
{
     @Override
     public void open(Map<String, Object> config, SinkContext sinkContext) 
throws Exception {
         kafkaSinkConfig = KafkaSinkConfig.load(config);
+        // 
kafkaSinkConfig.setKafkaConnectorSinkClass("org.apache.kafka.connect.file.FileStreamSinkConnector");
         Objects.requireNonNull(kafkaSinkConfig.getTopic(), "Kafka topic is not 
set");
-        Objects.requireNonNull(kafkaSinkConfig.getBootstrapServers(), "Kafka 
bootstrapServers is not set");
-        Objects.requireNonNull(kafkaSinkConfig.getAcks(), "Kafka acks mode is 
not set");
-        if (kafkaSinkConfig.getBatchSize() <= 0) {
-            throw new IllegalArgumentException("Invalid Kafka Producer 
batchSize : "
-                + kafkaSinkConfig.getBatchSize());
-        }
-        if (kafkaSinkConfig.getMaxRequestSize() <= 0) {
-            throw new IllegalArgumentException("Invalid Kafka Producer 
maxRequestSize : "
-                + kafkaSinkConfig.getMaxRequestSize());
-        }
-        if (kafkaSinkConfig.getProducerConfigProperties() != null) {
-            props.putAll(kafkaSinkConfig.getProducerConfigProperties());
-        }
 
-        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
kafkaSinkConfig.getBootstrapServers());
-        props.put(ProducerConfig.ACKS_CONFIG, kafkaSinkConfig.getAcks());
-        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 
String.valueOf(kafkaSinkConfig.getBatchSize()));
-        props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 
String.valueOf(kafkaSinkConfig.getMaxRequestSize()));
-        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
kafkaSinkConfig.getKeySerializerClass());
-        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
kafkaSinkConfig.getValueSerializerClass());
+        String kafkaConnectorName = 
kafkaSinkConfig.getKafkaConnectorSinkClass();
+        if (Strings.isNullOrEmpty(kafkaConnectorName)) {
+            Objects.requireNonNull(kafkaSinkConfig.getBootstrapServers(), 
"Kafka bootstrapServers is not set");
+            Objects.requireNonNull(kafkaSinkConfig.getAcks(), "Kafka acks mode 
is not set");
+            if (kafkaSinkConfig.getBatchSize() <= 0) {
+                throw new IllegalArgumentException("Invalid Kafka Producer 
batchSize : "
+                        + kafkaSinkConfig.getBatchSize());
+            }
+            if (kafkaSinkConfig.getMaxRequestSize() <= 0) {
+                throw new IllegalArgumentException("Invalid Kafka Producer 
maxRequestSize : "
+                        + kafkaSinkConfig.getMaxRequestSize());
+            }
+            if (kafkaSinkConfig.getProducerConfigProperties() != null) {
+                props.putAll(kafkaSinkConfig.getProducerConfigProperties());
+            }
+
+            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
kafkaSinkConfig.getBootstrapServers());
+            props.put(ProducerConfig.ACKS_CONFIG, kafkaSinkConfig.getAcks());
+            props.put(ProducerConfig.BATCH_SIZE_CONFIG, 
String.valueOf(kafkaSinkConfig.getBatchSize()));
+            props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 
String.valueOf(kafkaSinkConfig.getMaxRequestSize()));
+            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
kafkaSinkConfig.getKeySerializerClass());
+            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
kafkaSinkConfig.getValueSerializerClass());
+
+            producer = new KafkaProducer<>(beforeCreateProducer(props));
+        } else {
+            
kafkaSinkConfig.getKafkaConnectorConfigProperties().entrySet().stream()
+                    .forEach(kv -> props.put(kv.getKey(), kv.getValue()));
 
-        producer = new KafkaProducer<>(beforeCreateProducer(props));
+            // todo: schemas from config

Review comment:
       Please create a Github issue and link the issue here.

##########
File path: pulsar-io/kafka/pom.xml
##########
@@ -54,14 +54,23 @@
       <version>${kafka-client.version}</version>
     </dependency>
 
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>connect-api</artifactId>
+      <version>${kafka-client.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>connect-file</artifactId>
+      <version>${kafka-client.version}</version>
+      <scope>test</scope>
+    </dependency>
+
   </dependencies>
 
   <build>
     <plugins>
-      <plugin>

Review comment:
       Isn't this a breaking change?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to