Anonymitaet commented on a change in pull request #10084:
URL: https://github.com/apache/pulsar/pull/10084#discussion_r604884160



##########
File path: site2/docs/io-kafka-source.md
##########
@@ -17,19 +17,48 @@ The configuration of the Kafka source connector has the 
following properties.
 
 | Name | Type| Required | Default | Description 
 |------|----------|---------|-------------|-------------|
-|  `bootstrapServers` |String| true | " " (empty string) | A comma-separated 
list of host and port pairs for establishing the initial connection to the 
Kafka cluster. |
+| `bootstrapServers` |String| true | " " (empty string) | A comma-separated 
list of host and port pairs for establishing the initial connection to the 
Kafka cluster. |
 | `groupId` |String| true | " " (empty string) | A unique string that 
identifies the group of consumer processes to which this consumer belongs. |
 | `fetchMinBytes` | long|false | 1 | The minimum byte expected for each fetch 
response. |
 | `autoCommitEnabled` | boolean |false | true | If set to true, the consumer's 
offset is periodically committed in the background.<br/><br/> This committed 
offset is used when the process fails as the position from which a new consumer 
begins. |
 | `autoCommitIntervalMs` | long|false | 5000 | The frequency in milliseconds 
that the consumer offsets are auto-committed to Kafka if `autoCommitEnabled` is 
set to true. |
 | `heartbeatIntervalMs` | long| false | 3000 | The interval between heartbeats 
to the consumer when using Kafka's group management facilities. 
<br/><br/>**Note: `heartbeatIntervalMs` must be smaller than 
`sessionTimeoutMs`**.|
 | `sessionTimeoutMs` | long|false | 30000 | The timeout used to detect 
consumer failures when using Kafka's group management facility. |
 | `topic` | String|true | " " (empty string)| The Kafka topic which sends 
messages to Pulsar. |
-|  `consumerConfigProperties` | Map| false | " " (empty string) | The consumer 
configuration properties to be passed to consumers. <br/><br/>**Note: other 
properties specified in the connector configuration file take precedence over 
this configuration**. |
+| `consumerConfigProperties` | Map| false | " " (empty string) | The consumer 
configuration properties to be passed to consumers. <br/><br/>**Note: other 
properties specified in the connector configuration file take precedence over 
this configuration**. |
 | `keyDeserializationClass` | String|false | 
org.apache.kafka.common.serialization.StringDeserializer | The deserializer 
class for Kafka consumers to deserialize keys.<br/> The deserializer is set by 
a specific implementation of 
[`KafkaAbstractSource`](https://github.com/apache/pulsar/blob/master/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java).
 | `valueDeserializationClass` | String|false | 
org.apache.kafka.common.serialization.ByteArrayDeserializer | The deserializer 
class for Kafka consumers to deserialize values.
 | `autoOffsetReset` | String | false | "earliest" | The default offset reset 
policy. |
 
+### Schema Management
+
+This Kafka Source connector applies the Schema to the topic depending on the 
data type that is present on the Kafka topic.
+You can detect the data type from the `keyDeserializationClass` and 
`valueDeserializationClass` configuration parameters.
+
+If the `valueDeserializationClass` is 
`org.apache.kafka.common.serialization.StringDeserializer`, you can set 
Schema.STRING() as schema type on the Pulsar topic.
+
+If `valueDeserializationClass` is 
`io.confluent.kafka.serializers.KafkaAvroDeserializer` Pulsar downloads the 
AVRO schema from the Confluent Schema Registry®
+and set it properly on the Pulsar topic.
+
+In this case you have to set `schema.registry.url` inside of the 
`consumerConfigProperties` configuration entry

Review comment:
       ```suggestion
   In this case, you need to set `schema.registry.url` inside of the 
`consumerConfigProperties` configuration entry
   ```

##########
File path: site2/docs/io-kafka-source.md
##########
@@ -17,19 +17,48 @@ The configuration of the Kafka source connector has the 
following properties.
 
 | Name | Type| Required | Default | Description 
 |------|----------|---------|-------------|-------------|
-|  `bootstrapServers` |String| true | " " (empty string) | A comma-separated 
list of host and port pairs for establishing the initial connection to the 
Kafka cluster. |
+| `bootstrapServers` |String| true | " " (empty string) | A comma-separated 
list of host and port pairs for establishing the initial connection to the 
Kafka cluster. |
 | `groupId` |String| true | " " (empty string) | A unique string that 
identifies the group of consumer processes to which this consumer belongs. |
 | `fetchMinBytes` | long|false | 1 | The minimum byte expected for each fetch 
response. |
 | `autoCommitEnabled` | boolean |false | true | If set to true, the consumer's 
offset is periodically committed in the background.<br/><br/> This committed 
offset is used when the process fails as the position from which a new consumer 
begins. |
 | `autoCommitIntervalMs` | long|false | 5000 | The frequency in milliseconds 
that the consumer offsets are auto-committed to Kafka if `autoCommitEnabled` is 
set to true. |
 | `heartbeatIntervalMs` | long| false | 3000 | The interval between heartbeats 
to the consumer when using Kafka's group management facilities. 
<br/><br/>**Note: `heartbeatIntervalMs` must be smaller than 
`sessionTimeoutMs`**.|
 | `sessionTimeoutMs` | long|false | 30000 | The timeout used to detect 
consumer failures when using Kafka's group management facility. |
 | `topic` | String|true | " " (empty string)| The Kafka topic which sends 
messages to Pulsar. |
-|  `consumerConfigProperties` | Map| false | " " (empty string) | The consumer 
configuration properties to be passed to consumers. <br/><br/>**Note: other 
properties specified in the connector configuration file take precedence over 
this configuration**. |
+| `consumerConfigProperties` | Map| false | " " (empty string) | The consumer 
configuration properties to be passed to consumers. <br/><br/>**Note: other 
properties specified in the connector configuration file take precedence over 
this configuration**. |
 | `keyDeserializationClass` | String|false | 
org.apache.kafka.common.serialization.StringDeserializer | The deserializer 
class for Kafka consumers to deserialize keys.<br/> The deserializer is set by 
a specific implementation of 
[`KafkaAbstractSource`](https://github.com/apache/pulsar/blob/master/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java).
 | `valueDeserializationClass` | String|false | 
org.apache.kafka.common.serialization.ByteArrayDeserializer | The deserializer 
class for Kafka consumers to deserialize values.
 | `autoOffsetReset` | String | false | "earliest" | The default offset reset 
policy. |
 
+### Schema Management
+
+This Kafka Source connector applies the Schema to the topic depending on the 
data type that is present on the Kafka topic.

Review comment:
       ```suggestion
   This Kafka source connector applies the schema to the topic depending on the 
data type that is present on the Kafka topic.
   ```

##########
File path: site2/docs/io-kafka-source.md
##########
@@ -17,19 +17,48 @@ The configuration of the Kafka source connector has the 
following properties.
 
 | Name | Type| Required | Default | Description 
 |------|----------|---------|-------------|-------------|
-|  `bootstrapServers` |String| true | " " (empty string) | A comma-separated 
list of host and port pairs for establishing the initial connection to the 
Kafka cluster. |
+| `bootstrapServers` |String| true | " " (empty string) | A comma-separated 
list of host and port pairs for establishing the initial connection to the 
Kafka cluster. |
 | `groupId` |String| true | " " (empty string) | A unique string that 
identifies the group of consumer processes to which this consumer belongs. |
 | `fetchMinBytes` | long|false | 1 | The minimum byte expected for each fetch 
response. |
 | `autoCommitEnabled` | boolean |false | true | If set to true, the consumer's 
offset is periodically committed in the background.<br/><br/> This committed 
offset is used when the process fails as the position from which a new consumer 
begins. |
 | `autoCommitIntervalMs` | long|false | 5000 | The frequency in milliseconds 
that the consumer offsets are auto-committed to Kafka if `autoCommitEnabled` is 
set to true. |
 | `heartbeatIntervalMs` | long| false | 3000 | The interval between heartbeats 
to the consumer when using Kafka's group management facilities. 
<br/><br/>**Note: `heartbeatIntervalMs` must be smaller than 
`sessionTimeoutMs`**.|
 | `sessionTimeoutMs` | long|false | 30000 | The timeout used to detect 
consumer failures when using Kafka's group management facility. |
 | `topic` | String|true | " " (empty string)| The Kafka topic which sends 
messages to Pulsar. |
-|  `consumerConfigProperties` | Map| false | " " (empty string) | The consumer 
configuration properties to be passed to consumers. <br/><br/>**Note: other 
properties specified in the connector configuration file take precedence over 
this configuration**. |
+| `consumerConfigProperties` | Map| false | " " (empty string) | The consumer 
configuration properties to be passed to consumers. <br/><br/>**Note: other 
properties specified in the connector configuration file take precedence over 
this configuration**. |
 | `keyDeserializationClass` | String|false | 
org.apache.kafka.common.serialization.StringDeserializer | The deserializer 
class for Kafka consumers to deserialize keys.<br/> The deserializer is set by 
a specific implementation of 
[`KafkaAbstractSource`](https://github.com/apache/pulsar/blob/master/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java).
 | `valueDeserializationClass` | String|false | 
org.apache.kafka.common.serialization.ByteArrayDeserializer | The deserializer 
class for Kafka consumers to deserialize values.
 | `autoOffsetReset` | String | false | "earliest" | The default offset reset 
policy. |
 
+### Schema Management
+
+This Kafka Source connector applies the Schema to the topic depending on the 
data type that is present on the Kafka topic.
+You can detect the data type from the `keyDeserializationClass` and 
`valueDeserializationClass` configuration parameters.
+
+If the `valueDeserializationClass` is 
`org.apache.kafka.common.serialization.StringDeserializer`, you can set 
Schema.STRING() as schema type on the Pulsar topic.
+
+If `valueDeserializationClass` is 
`io.confluent.kafka.serializers.KafkaAvroDeserializer` Pulsar downloads the 
AVRO schema from the Confluent Schema Registry®

Review comment:
       ```suggestion
   If `valueDeserializationClass` is 
`io.confluent.kafka.serializers.KafkaAvroDeserializer`, Pulsar downloads the 
AVRO schema from the Confluent Schema Registry®
   ```

##########
File path: site2/docs/io-kafka-source.md
##########
@@ -17,19 +17,48 @@ The configuration of the Kafka source connector has the 
following properties.
 
 | Name | Type| Required | Default | Description 
 |------|----------|---------|-------------|-------------|
-|  `bootstrapServers` |String| true | " " (empty string) | A comma-separated 
list of host and port pairs for establishing the initial connection to the 
Kafka cluster. |
+| `bootstrapServers` |String| true | " " (empty string) | A comma-separated 
list of host and port pairs for establishing the initial connection to the 
Kafka cluster. |
 | `groupId` |String| true | " " (empty string) | A unique string that 
identifies the group of consumer processes to which this consumer belongs. |
 | `fetchMinBytes` | long|false | 1 | The minimum byte expected for each fetch 
response. |
 | `autoCommitEnabled` | boolean |false | true | If set to true, the consumer's 
offset is periodically committed in the background.<br/><br/> This committed 
offset is used when the process fails as the position from which a new consumer 
begins. |
 | `autoCommitIntervalMs` | long|false | 5000 | The frequency in milliseconds 
that the consumer offsets are auto-committed to Kafka if `autoCommitEnabled` is 
set to true. |
 | `heartbeatIntervalMs` | long| false | 3000 | The interval between heartbeats 
to the consumer when using Kafka's group management facilities. 
<br/><br/>**Note: `heartbeatIntervalMs` must be smaller than 
`sessionTimeoutMs`**.|
 | `sessionTimeoutMs` | long|false | 30000 | The timeout used to detect 
consumer failures when using Kafka's group management facility. |
 | `topic` | String|true | " " (empty string)| The Kafka topic which sends 
messages to Pulsar. |
-|  `consumerConfigProperties` | Map| false | " " (empty string) | The consumer 
configuration properties to be passed to consumers. <br/><br/>**Note: other 
properties specified in the connector configuration file take precedence over 
this configuration**. |
+| `consumerConfigProperties` | Map| false | " " (empty string) | The consumer 
configuration properties to be passed to consumers. <br/><br/>**Note: other 
properties specified in the connector configuration file take precedence over 
this configuration**. |
 | `keyDeserializationClass` | String|false | 
org.apache.kafka.common.serialization.StringDeserializer | The deserializer 
class for Kafka consumers to deserialize keys.<br/> The deserializer is set by 
a specific implementation of 
[`KafkaAbstractSource`](https://github.com/apache/pulsar/blob/master/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java).
 | `valueDeserializationClass` | String|false | 
org.apache.kafka.common.serialization.ByteArrayDeserializer | The deserializer 
class for Kafka consumers to deserialize values.
 | `autoOffsetReset` | String | false | "earliest" | The default offset reset 
policy. |
 
+### Schema Management
+
+This Kafka Source connector applies the Schema to the topic depending on the 
data type that is present on the Kafka topic.
+You can detect the data type from the `keyDeserializationClass` and 
`valueDeserializationClass` configuration parameters.
+
+If the `valueDeserializationClass` is 
`org.apache.kafka.common.serialization.StringDeserializer`, you can set 
Schema.STRING() as schema type on the Pulsar topic.
+
+If `valueDeserializationClass` is 
`io.confluent.kafka.serializers.KafkaAvroDeserializer` Pulsar downloads the 
AVRO schema from the Confluent Schema Registry®
+and set it properly on the Pulsar topic.
+
+In this case you have to set `schema.registry.url` inside of the 
`consumerConfigProperties` configuration entry
+of the source.
+
+When `keyDeserializationClass` is not 
`org.apache.kafka.common.serialization.StringDeserializer` it means 
+that you do not have a String as key and the Kafka Source uses the KeyValue 
schema type with the SEPARATED encoding.
+
+We are also supporting AVRO format for keys as well.
+
+This way on Pulsar you have a topic with these properties:
+- Schema: KeyValue schema with SEPARATED encoding
+- Key: the content of key of the Kafka message (base64 encoded)
+- Value: the content of value of the Kafka message
+- KeySchema: the schema, detected from `keyDeserializationClass`
+- ValueSchema: the schema, detected from `valueDeserializationClass`
+
+Topic compaction and partition routing use the Pulsar key, that contains the 
Kafka key, and so they are driven by the same value that you have on Kafka.
+
+When you consume data from Pulsar topics, you can use the `KeyValue` schema. 
In this way, you can decode the data properly.
+In case you want to access the raw key you can use the Message#getKeyBytes() 
API.

Review comment:
       ```suggestion
   If you want to access the raw key, you can use the `Message#getKeyBytes()` 
API.
   ```

##########
File path: site2/docs/io-kafka-source.md
##########
@@ -17,19 +17,48 @@ The configuration of the Kafka source connector has the 
following properties.
 
 | Name | Type| Required | Default | Description 
 |------|----------|---------|-------------|-------------|
-|  `bootstrapServers` |String| true | " " (empty string) | A comma-separated 
list of host and port pairs for establishing the initial connection to the 
Kafka cluster. |
+| `bootstrapServers` |String| true | " " (empty string) | A comma-separated 
list of host and port pairs for establishing the initial connection to the 
Kafka cluster. |
 | `groupId` |String| true | " " (empty string) | A unique string that 
identifies the group of consumer processes to which this consumer belongs. |
 | `fetchMinBytes` | long|false | 1 | The minimum byte expected for each fetch 
response. |
 | `autoCommitEnabled` | boolean |false | true | If set to true, the consumer's 
offset is periodically committed in the background.<br/><br/> This committed 
offset is used when the process fails as the position from which a new consumer 
begins. |
 | `autoCommitIntervalMs` | long|false | 5000 | The frequency in milliseconds 
that the consumer offsets are auto-committed to Kafka if `autoCommitEnabled` is 
set to true. |
 | `heartbeatIntervalMs` | long| false | 3000 | The interval between heartbeats 
to the consumer when using Kafka's group management facilities. 
<br/><br/>**Note: `heartbeatIntervalMs` must be smaller than 
`sessionTimeoutMs`**.|
 | `sessionTimeoutMs` | long|false | 30000 | The timeout used to detect 
consumer failures when using Kafka's group management facility. |
 | `topic` | String|true | " " (empty string)| The Kafka topic which sends 
messages to Pulsar. |
-|  `consumerConfigProperties` | Map| false | " " (empty string) | The consumer 
configuration properties to be passed to consumers. <br/><br/>**Note: other 
properties specified in the connector configuration file take precedence over 
this configuration**. |
+| `consumerConfigProperties` | Map| false | " " (empty string) | The consumer 
configuration properties to be passed to consumers. <br/><br/>**Note: other 
properties specified in the connector configuration file take precedence over 
this configuration**. |
 | `keyDeserializationClass` | String|false | 
org.apache.kafka.common.serialization.StringDeserializer | The deserializer 
class for Kafka consumers to deserialize keys.<br/> The deserializer is set by 
a specific implementation of 
[`KafkaAbstractSource`](https://github.com/apache/pulsar/blob/master/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java).
 | `valueDeserializationClass` | String|false | 
org.apache.kafka.common.serialization.ByteArrayDeserializer | The deserializer 
class for Kafka consumers to deserialize values.
 | `autoOffsetReset` | String | false | "earliest" | The default offset reset 
policy. |
 
+### Schema Management
+
+This Kafka Source connector applies the Schema to the topic depending on the 
data type that is present on the Kafka topic.
+You can detect the data type from the `keyDeserializationClass` and 
`valueDeserializationClass` configuration parameters.
+
+If the `valueDeserializationClass` is 
`org.apache.kafka.common.serialization.StringDeserializer`, you can set 
Schema.STRING() as schema type on the Pulsar topic.
+
+If `valueDeserializationClass` is 
`io.confluent.kafka.serializers.KafkaAvroDeserializer` Pulsar downloads the 
AVRO schema from the Confluent Schema Registry®
+and set it properly on the Pulsar topic.
+
+In this case you have to set `schema.registry.url` inside of the 
`consumerConfigProperties` configuration entry
+of the source.
+
+When `keyDeserializationClass` is not 
`org.apache.kafka.common.serialization.StringDeserializer` it means 
+that you do not have a String as key and the Kafka Source uses the KeyValue 
schema type with the SEPARATED encoding.
+
+We are also supporting AVRO format for keys as well.
+
+This way on Pulsar you have a topic with these properties:
+- Schema: KeyValue schema with SEPARATED encoding
+- Key: the content of key of the Kafka message (base64 encoded)
+- Value: the content of value of the Kafka message
+- KeySchema: the schema, detected from `keyDeserializationClass`

Review comment:
       ```suggestion
   - KeySchema: the schema detected from `keyDeserializationClass`
   ```

##########
File path: site2/docs/io-kafka-source.md
##########
@@ -17,19 +17,48 @@ The configuration of the Kafka source connector has the 
following properties.
 
 | Name | Type| Required | Default | Description 
 |------|----------|---------|-------------|-------------|
-|  `bootstrapServers` |String| true | " " (empty string) | A comma-separated 
list of host and port pairs for establishing the initial connection to the 
Kafka cluster. |
+| `bootstrapServers` |String| true | " " (empty string) | A comma-separated 
list of host and port pairs for establishing the initial connection to the 
Kafka cluster. |
 | `groupId` |String| true | " " (empty string) | A unique string that 
identifies the group of consumer processes to which this consumer belongs. |
 | `fetchMinBytes` | long|false | 1 | The minimum byte expected for each fetch 
response. |
 | `autoCommitEnabled` | boolean |false | true | If set to true, the consumer's 
offset is periodically committed in the background.<br/><br/> This committed 
offset is used when the process fails as the position from which a new consumer 
begins. |
 | `autoCommitIntervalMs` | long|false | 5000 | The frequency in milliseconds 
that the consumer offsets are auto-committed to Kafka if `autoCommitEnabled` is 
set to true. |
 | `heartbeatIntervalMs` | long| false | 3000 | The interval between heartbeats 
to the consumer when using Kafka's group management facilities. 
<br/><br/>**Note: `heartbeatIntervalMs` must be smaller than 
`sessionTimeoutMs`**.|
 | `sessionTimeoutMs` | long|false | 30000 | The timeout used to detect 
consumer failures when using Kafka's group management facility. |
 | `topic` | String|true | " " (empty string)| The Kafka topic which sends 
messages to Pulsar. |
-|  `consumerConfigProperties` | Map| false | " " (empty string) | The consumer 
configuration properties to be passed to consumers. <br/><br/>**Note: other 
properties specified in the connector configuration file take precedence over 
this configuration**. |
+| `consumerConfigProperties` | Map| false | " " (empty string) | The consumer 
configuration properties to be passed to consumers. <br/><br/>**Note: other 
properties specified in the connector configuration file take precedence over 
this configuration**. |
 | `keyDeserializationClass` | String|false | 
org.apache.kafka.common.serialization.StringDeserializer | The deserializer 
class for Kafka consumers to deserialize keys.<br/> The deserializer is set by 
a specific implementation of 
[`KafkaAbstractSource`](https://github.com/apache/pulsar/blob/master/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java).
 | `valueDeserializationClass` | String|false | 
org.apache.kafka.common.serialization.ByteArrayDeserializer | The deserializer 
class for Kafka consumers to deserialize values.
 | `autoOffsetReset` | String | false | "earliest" | The default offset reset 
policy. |
 
+### Schema Management
+
+This Kafka Source connector applies the Schema to the topic depending on the 
data type that is present on the Kafka topic.
+You can detect the data type from the `keyDeserializationClass` and 
`valueDeserializationClass` configuration parameters.
+
+If the `valueDeserializationClass` is 
`org.apache.kafka.common.serialization.StringDeserializer`, you can set 
Schema.STRING() as schema type on the Pulsar topic.
+
+If `valueDeserializationClass` is 
`io.confluent.kafka.serializers.KafkaAvroDeserializer` Pulsar downloads the 
AVRO schema from the Confluent Schema Registry®
+and set it properly on the Pulsar topic.
+
+In this case you have to set `schema.registry.url` inside of the 
`consumerConfigProperties` configuration entry
+of the source.
+
+When `keyDeserializationClass` is not 
`org.apache.kafka.common.serialization.StringDeserializer` it means 

Review comment:
       ```suggestion
   If `keyDeserializationClass` is not 
`org.apache.kafka.common.serialization.StringDeserializer`, it means 
   ```

##########
File path: site2/docs/io-kafka-source.md
##########
@@ -17,19 +17,48 @@ The configuration of the Kafka source connector has the 
following properties.
 
 | Name | Type| Required | Default | Description 
 |------|----------|---------|-------------|-------------|
-|  `bootstrapServers` |String| true | " " (empty string) | A comma-separated 
list of host and port pairs for establishing the initial connection to the 
Kafka cluster. |
+| `bootstrapServers` |String| true | " " (empty string) | A comma-separated 
list of host and port pairs for establishing the initial connection to the 
Kafka cluster. |
 | `groupId` |String| true | " " (empty string) | A unique string that 
identifies the group of consumer processes to which this consumer belongs. |
 | `fetchMinBytes` | long|false | 1 | The minimum byte expected for each fetch 
response. |
 | `autoCommitEnabled` | boolean |false | true | If set to true, the consumer's 
offset is periodically committed in the background.<br/><br/> This committed 
offset is used when the process fails as the position from which a new consumer 
begins. |
 | `autoCommitIntervalMs` | long|false | 5000 | The frequency in milliseconds 
that the consumer offsets are auto-committed to Kafka if `autoCommitEnabled` is 
set to true. |
 | `heartbeatIntervalMs` | long| false | 3000 | The interval between heartbeats 
to the consumer when using Kafka's group management facilities. 
<br/><br/>**Note: `heartbeatIntervalMs` must be smaller than 
`sessionTimeoutMs`**.|
 | `sessionTimeoutMs` | long|false | 30000 | The timeout used to detect 
consumer failures when using Kafka's group management facility. |
 | `topic` | String|true | " " (empty string)| The Kafka topic which sends 
messages to Pulsar. |
-|  `consumerConfigProperties` | Map| false | " " (empty string) | The consumer 
configuration properties to be passed to consumers. <br/><br/>**Note: other 
properties specified in the connector configuration file take precedence over 
this configuration**. |
+| `consumerConfigProperties` | Map| false | " " (empty string) | The consumer 
configuration properties to be passed to consumers. <br/><br/>**Note: other 
properties specified in the connector configuration file take precedence over 
this configuration**. |
 | `keyDeserializationClass` | String|false | 
org.apache.kafka.common.serialization.StringDeserializer | The deserializer 
class for Kafka consumers to deserialize keys.<br/> The deserializer is set by 
a specific implementation of 
[`KafkaAbstractSource`](https://github.com/apache/pulsar/blob/master/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java).
 | `valueDeserializationClass` | String|false | 
org.apache.kafka.common.serialization.ByteArrayDeserializer | The deserializer 
class for Kafka consumers to deserialize values.
 | `autoOffsetReset` | String | false | "earliest" | The default offset reset 
policy. |
 
+### Schema Management
+
+This Kafka Source connector applies the Schema to the topic depending on the 
data type that is present on the Kafka topic.
+You can detect the data type from the `keyDeserializationClass` and 
`valueDeserializationClass` configuration parameters.
+
+If the `valueDeserializationClass` is 
`org.apache.kafka.common.serialization.StringDeserializer`, you can set 
Schema.STRING() as schema type on the Pulsar topic.
+
+If `valueDeserializationClass` is 
`io.confluent.kafka.serializers.KafkaAvroDeserializer` Pulsar downloads the 
AVRO schema from the Confluent Schema Registry®
+and set it properly on the Pulsar topic.

Review comment:
       ```suggestion
   and sets it properly on the Pulsar topic.
   ```

##########
File path: site2/docs/io-kafka-source.md
##########
@@ -17,19 +17,48 @@ The configuration of the Kafka source connector has the 
following properties.
 
 | Name | Type| Required | Default | Description 
 |------|----------|---------|-------------|-------------|
-|  `bootstrapServers` |String| true | " " (empty string) | A comma-separated 
list of host and port pairs for establishing the initial connection to the 
Kafka cluster. |
+| `bootstrapServers` |String| true | " " (empty string) | A comma-separated 
list of host and port pairs for establishing the initial connection to the 
Kafka cluster. |
 | `groupId` |String| true | " " (empty string) | A unique string that 
identifies the group of consumer processes to which this consumer belongs. |
 | `fetchMinBytes` | long|false | 1 | The minimum byte expected for each fetch 
response. |
 | `autoCommitEnabled` | boolean |false | true | If set to true, the consumer's 
offset is periodically committed in the background.<br/><br/> This committed 
offset is used when the process fails as the position from which a new consumer 
begins. |
 | `autoCommitIntervalMs` | long|false | 5000 | The frequency in milliseconds 
that the consumer offsets are auto-committed to Kafka if `autoCommitEnabled` is 
set to true. |
 | `heartbeatIntervalMs` | long| false | 3000 | The interval between heartbeats 
to the consumer when using Kafka's group management facilities. 
<br/><br/>**Note: `heartbeatIntervalMs` must be smaller than 
`sessionTimeoutMs`**.|
 | `sessionTimeoutMs` | long|false | 30000 | The timeout used to detect 
consumer failures when using Kafka's group management facility. |
 | `topic` | String|true | " " (empty string)| The Kafka topic which sends 
messages to Pulsar. |
-|  `consumerConfigProperties` | Map| false | " " (empty string) | The consumer 
configuration properties to be passed to consumers. <br/><br/>**Note: other 
properties specified in the connector configuration file take precedence over 
this configuration**. |
+| `consumerConfigProperties` | Map| false | " " (empty string) | The consumer 
configuration properties to be passed to consumers. <br/><br/>**Note: other 
properties specified in the connector configuration file take precedence over 
this configuration**. |
 | `keyDeserializationClass` | String|false | 
org.apache.kafka.common.serialization.StringDeserializer | The deserializer 
class for Kafka consumers to deserialize keys.<br/> The deserializer is set by 
a specific implementation of 
[`KafkaAbstractSource`](https://github.com/apache/pulsar/blob/master/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java).
 | `valueDeserializationClass` | String|false | 
org.apache.kafka.common.serialization.ByteArrayDeserializer | The deserializer 
class for Kafka consumers to deserialize values.
 | `autoOffsetReset` | String | false | "earliest" | The default offset reset 
policy. |
 
+### Schema Management
+
+This Kafka Source connector applies the Schema to the topic depending on the 
data type that is present on the Kafka topic.
+You can detect the data type from the `keyDeserializationClass` and 
`valueDeserializationClass` configuration parameters.
+
+If the `valueDeserializationClass` is 
`org.apache.kafka.common.serialization.StringDeserializer`, you can set 
Schema.STRING() as schema type on the Pulsar topic.
+
+If `valueDeserializationClass` is 
`io.confluent.kafka.serializers.KafkaAvroDeserializer` Pulsar downloads the 
AVRO schema from the Confluent Schema Registry®
+and set it properly on the Pulsar topic.
+
+In this case you have to set `schema.registry.url` inside of the 
`consumerConfigProperties` configuration entry
+of the source.
+
+When `keyDeserializationClass` is not 
`org.apache.kafka.common.serialization.StringDeserializer` it means 
+that you do not have a String as key and the Kafka Source uses the KeyValue 
schema type with the SEPARATED encoding.
+
+We are also supporting AVRO format for keys as well.
+
+This way on Pulsar you have a topic with these properties:
+- Schema: KeyValue schema with SEPARATED encoding
+- Key: the content of key of the Kafka message (base64 encoded)
+- Value: the content of value of the Kafka message
+- KeySchema: the schema, detected from `keyDeserializationClass`
+- ValueSchema: the schema, detected from `valueDeserializationClass`

Review comment:
       ```suggestion
   - ValueSchema: the schema detected from `valueDeserializationClass`
   ```

##########
File path: site2/docs/io-kafka-source.md
##########
@@ -17,19 +17,48 @@ The configuration of the Kafka source connector has the 
following properties.
 
 | Name | Type| Required | Default | Description 
 |------|----------|---------|-------------|-------------|
-|  `bootstrapServers` |String| true | " " (empty string) | A comma-separated 
list of host and port pairs for establishing the initial connection to the 
Kafka cluster. |
+| `bootstrapServers` |String| true | " " (empty string) | A comma-separated 
list of host and port pairs for establishing the initial connection to the 
Kafka cluster. |
 | `groupId` |String| true | " " (empty string) | A unique string that 
identifies the group of consumer processes to which this consumer belongs. |
 | `fetchMinBytes` | long|false | 1 | The minimum byte expected for each fetch 
response. |
 | `autoCommitEnabled` | boolean |false | true | If set to true, the consumer's 
offset is periodically committed in the background.<br/><br/> This committed 
offset is used when the process fails as the position from which a new consumer 
begins. |
 | `autoCommitIntervalMs` | long|false | 5000 | The frequency in milliseconds 
that the consumer offsets are auto-committed to Kafka if `autoCommitEnabled` is 
set to true. |
 | `heartbeatIntervalMs` | long| false | 3000 | The interval between heartbeats 
to the consumer when using Kafka's group management facilities. 
<br/><br/>**Note: `heartbeatIntervalMs` must be smaller than 
`sessionTimeoutMs`**.|
 | `sessionTimeoutMs` | long|false | 30000 | The timeout used to detect 
consumer failures when using Kafka's group management facility. |
 | `topic` | String|true | " " (empty string)| The Kafka topic which sends 
messages to Pulsar. |
-|  `consumerConfigProperties` | Map| false | " " (empty string) | The consumer 
configuration properties to be passed to consumers. <br/><br/>**Note: other 
properties specified in the connector configuration file take precedence over 
this configuration**. |
+| `consumerConfigProperties` | Map| false | " " (empty string) | The consumer 
configuration properties to be passed to consumers. <br/><br/>**Note: other 
properties specified in the connector configuration file take precedence over 
this configuration**. |
 | `keyDeserializationClass` | String|false | 
org.apache.kafka.common.serialization.StringDeserializer | The deserializer 
class for Kafka consumers to deserialize keys.<br/> The deserializer is set by 
a specific implementation of 
[`KafkaAbstractSource`](https://github.com/apache/pulsar/blob/master/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java).
 | `valueDeserializationClass` | String|false | 
org.apache.kafka.common.serialization.ByteArrayDeserializer | The deserializer 
class for Kafka consumers to deserialize values.
 | `autoOffsetReset` | String | false | "earliest" | The default offset reset 
policy. |
 
+### Schema Management
+
+This Kafka Source connector applies the Schema to the topic depending on the 
data type that is present on the Kafka topic.
+You can detect the data type from the `keyDeserializationClass` and 
`valueDeserializationClass` configuration parameters.
+
+If the `valueDeserializationClass` is 
`org.apache.kafka.common.serialization.StringDeserializer`, you can set 
Schema.STRING() as schema type on the Pulsar topic.
+
+If `valueDeserializationClass` is 
`io.confluent.kafka.serializers.KafkaAvroDeserializer` Pulsar downloads the 
AVRO schema from the Confluent Schema Registry®
+and set it properly on the Pulsar topic.
+
+In this case you have to set `schema.registry.url` inside of the 
`consumerConfigProperties` configuration entry
+of the source.
+
+When `keyDeserializationClass` is not 
`org.apache.kafka.common.serialization.StringDeserializer` it means 
+that you do not have a String as key and the Kafka Source uses the KeyValue 
schema type with the SEPARATED encoding.
+
+We are also supporting AVRO format for keys as well.
+
+This way on Pulsar you have a topic with these properties:

Review comment:
       ```suggestion
   In this case, you can have a Pulsar topic with the following properties:
   ```

##########
File path: site2/docs/io-kafka-source.md
##########
@@ -17,19 +17,48 @@ The configuration of the Kafka source connector has the 
following properties.
 
 | Name | Type| Required | Default | Description 
 |------|----------|---------|-------------|-------------|
-|  `bootstrapServers` |String| true | " " (empty string) | A comma-separated 
list of host and port pairs for establishing the initial connection to the 
Kafka cluster. |
+| `bootstrapServers` |String| true | " " (empty string) | A comma-separated 
list of host and port pairs for establishing the initial connection to the 
Kafka cluster. |
 | `groupId` |String| true | " " (empty string) | A unique string that 
identifies the group of consumer processes to which this consumer belongs. |
 | `fetchMinBytes` | long|false | 1 | The minimum byte expected for each fetch 
response. |
 | `autoCommitEnabled` | boolean |false | true | If set to true, the consumer's 
offset is periodically committed in the background.<br/><br/> This committed 
offset is used when the process fails as the position from which a new consumer 
begins. |
 | `autoCommitIntervalMs` | long|false | 5000 | The frequency in milliseconds 
that the consumer offsets are auto-committed to Kafka if `autoCommitEnabled` is 
set to true. |
 | `heartbeatIntervalMs` | long| false | 3000 | The interval between heartbeats 
to the consumer when using Kafka's group management facilities. 
<br/><br/>**Note: `heartbeatIntervalMs` must be smaller than 
`sessionTimeoutMs`**.|
 | `sessionTimeoutMs` | long|false | 30000 | The timeout used to detect 
consumer failures when using Kafka's group management facility. |
 | `topic` | String|true | " " (empty string)| The Kafka topic which sends 
messages to Pulsar. |
-|  `consumerConfigProperties` | Map| false | " " (empty string) | The consumer 
configuration properties to be passed to consumers. <br/><br/>**Note: other 
properties specified in the connector configuration file take precedence over 
this configuration**. |
+| `consumerConfigProperties` | Map| false | " " (empty string) | The consumer 
configuration properties to be passed to consumers. <br/><br/>**Note: other 
properties specified in the connector configuration file take precedence over 
this configuration**. |
 | `keyDeserializationClass` | String|false | 
org.apache.kafka.common.serialization.StringDeserializer | The deserializer 
class for Kafka consumers to deserialize keys.<br/> The deserializer is set by 
a specific implementation of 
[`KafkaAbstractSource`](https://github.com/apache/pulsar/blob/master/pulsar-io/kafka/src/main/java/org/apache/pulsar/io/kafka/KafkaAbstractSource.java).
 | `valueDeserializationClass` | String|false | 
org.apache.kafka.common.serialization.ByteArrayDeserializer | The deserializer 
class for Kafka consumers to deserialize values.
 | `autoOffsetReset` | String | false | "earliest" | The default offset reset 
policy. |
 
+### Schema Management
+
+This Kafka Source connector applies the Schema to the topic depending on the 
data type that is present on the Kafka topic.
+You can detect the data type from the `keyDeserializationClass` and 
`valueDeserializationClass` configuration parameters.
+
+If the `valueDeserializationClass` is 
`org.apache.kafka.common.serialization.StringDeserializer`, you can set 
Schema.STRING() as schema type on the Pulsar topic.
+
+If `valueDeserializationClass` is 
`io.confluent.kafka.serializers.KafkaAvroDeserializer` Pulsar downloads the 
AVRO schema from the Confluent Schema Registry®
+and set it properly on the Pulsar topic.
+
+In this case you have to set `schema.registry.url` inside of the 
`consumerConfigProperties` configuration entry
+of the source.
+
+When `keyDeserializationClass` is not 
`org.apache.kafka.common.serialization.StringDeserializer` it means 
+that you do not have a String as key and the Kafka Source uses the KeyValue 
schema type with the SEPARATED encoding.
+
+We are also supporting AVRO format for keys as well.

Review comment:
       ```suggestion
   Pulsar supports AVRO format for keys.
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to