Ycallaer opened a new issue, #16881:
URL: https://github.com/apache/druid/issues/16881

   Hi,
   As part of a POC we are trying to ingest kafka protobuf messages, the schema 
is stored in schema registry.
   We are following these 2 blogs:
   
   - 
https://blog.hellmar-becker.de/2022/05/26/ingesting-protobuf-messages-into-apache-druid/
   - 
https://druid.apache.org/docs/latest/development/extensions-core/protobuf/#when-using-schema-registry
   
   Since this is a POC, no security is present on the kafka cluster ( we are 
using Confluent CFK, the demo repo that they made available).
   
   To run druid we are using https://github.com/datainfrahq/druid-operator.
   
   The ingestion configuration we are submitting from the UI is the following
   
   ```
   {
   "type": "kafka",
   "spec": {
       "dataSchema": {
           "dataSource": "etf_dummy_data",
           "timestampSpec": {
               "column": "timestamp",
               "format": "auto"
           },
           "dimensionsSpec": {
               "dimensions": [
                   "unit",
                   "http_method",
                   "http_code",
                   "page",
                   "metricType",
                   "server"
               ],
               "dimensionExclusions": [
                   "timestamp",
                   "value"
               ]
           },
           "metricsSpec": [
               {
                   "name": "count",
                   "type": "count"
               },
               {
                   "name": "value_sum",
                   "fieldName": "value",
                   "type": "doubleSum"
               },
               {
                   "name": "value_min",
                   "fieldName": "value",
                   "type": "doubleMin"
               },
               {
                   "name": "value_max",
                   "fieldName": "value",
                   "type": "doubleMax"
               }
           ],
           "granularitySpec": {
               "type": "uniform",
               "segmentGranularity": "HOUR",
               "queryGranularity": "NONE"
           }
       },
       "tuningConfig": {
           "type": "kafka",
           "maxRowsPerSegment": 5000000
       },
       "ioConfig": {
           "topic": "etf_dummy_data",
           "consumerProperties": {
               "bootstrap.servers": "kafka.confluent.svc.cluster.local:9092"
           },
           "inputFormat": {
               "type": "protobuf",
               "protoBytesDecoder": {
                   "url": 
"http://schemaregistry.confluent.svc.cluster.local:8081";,
                "type": "schema_registry",
                   "capacity": 100,
               },
               "flattenSpec": {
                   "useFieldDiscovery": true
               },
               "binaryAsString": false
           },
           "taskCount": 1,
           "replicas": 1,
           "taskDuration": "PT1H",
           "type": "kafka"
       }
   }
   }
   ```
   When this is submitted, the following stacktrace is generated from the UI
   ```
   Failed to submit supervisor: Cannot construct instance of 
`org.apache.druid.data.input.protobuf.SchemaRegistryBasedProtobufBytesDecoder`,
    problem: io/confluent/kafka/schemaregistry/protobuf/ProtobufSchemaProvider 
at [Source: (org.eclipse.jetty.server.HttpInputOverHTTP); line: 1, column: 302]
    (through reference chain: 
org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec["spec"]
    
->org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIngestionSpec["ioConfig"]
    
->org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig["inputFormat"]
    
->org.apache.druid.data.input.protobuf.ProtobufInputFormat["protoBytesDecoder"])
   ```
   
   If we just provide topic and broker we get a successful connection, but we 
get binary data. So we know it is not a network / connectivity issue. The 
moment the schema registry part is added is when we get errors.
   
   ### Affected Version
   
   I have tested this with the image of Apache druid 25.0.0 and 28.0.1 and 
29.0.1
   
   ### Description
   
   Please include as much detailed information about the problem as possible.
   - Cluster size: The cluster definition we are using is the tiny cluster from 
https://github.com/datainfrahq/druid-operator/blob/master/examples/tiny-cluster.yaml
   
   - From the e2e example dir , I modified the file 
https://github.com/datainfrahq/druid-operator/blob/master/e2e/configs/druid-cr.yaml
 , and added in the common runtime the extension druid-protobuf-extensions
     Additionally , through a side container I have added the following JARs
   ```
     - containerName: protobuf 
       runAsInit: true
       image: apache/druid:28.0.1
       command:
       - "sh"
       - "-c"
       - "wget -O /tmp/kafka-protobuf-provider-7.6.0.jar 
https://packages.confluent.io/maven/io/confluent/kafka-protobuf-provider/7.6.0/kafka-protobuf-provider-7.6.0.jar
 && wget -O /tmp/kotlin-stdlib-1.9.10.jar 
https://repo1.maven.org/maven2/org/jetbrains/kotlin/kotlin-stdlib/1.9.10/kotlin-stdlib-1.9.10.jar
 && wget -O /tmp/wire-schema-4.9.9.jar 
https://repo1.maven.org/maven2/com/squareup/wire/wire-schema/4.9.9/wire-schema-4.9.9.jar
 && cp /tmp/*.jar /opt/druid/extensions/protobuf-extensions"
       #- "wget -O /tmp/kafka-protobuf-provider-6.0.1.jar 
https://packages.confluent.io/maven/io/confluent/kafka-protobuf-provider/6.0.1/kafka-protobuf-provider-6.0.1.jar
 && wget -O /tmp/otlin-stdlib-1.4.0.jar 
https://repo1.maven.org/maven2/org/jetbrains/kotlin/kotlin-stdlib/1.4.0/kotlin-stdlib-1.4.0.jar
 && wget -O /tmp/wire-schema-3.2.2.jar 
https://repo1.maven.org/maven2/com/squareup/wire/wire-schema/3.2.2/wire-schema-3.2.2.jar
 && cp /tmp/*.jar /opt/druid/extensions/protobuf-extensions"
       volumeMounts:
       - name: protobufextensions
         mountPath: "/opt/druid/extensions/protobuf-extensions"  
   ```
   If I exec into the pod, I can see that the JARs are present on the system.
   
   If we change the keyword `inputFormat` to `parser` (the old syntax way), the 
job submits, but the configuration part of schema registry dissapears and the 
job in the end fails stating that `inputFormat` block is missing.
   
   I had already reached out in the 
[slack](https://apachedruidworkspace.slack.com/archives/C0309C9L90D/p1723464457849649?thread_ts=1723442601.550529&cid=C0309C9L90D)
 channel, and they requested me to log an incident.
   
   
   Additionally, when these jobs get triggered I can see that the following 
libraries being loaded but not the 3 libs that I add through the side 
container. Not sure if that is relevant or not.
   
   ```
   12:34:12.739 [main] WARN  
org.apache.druid.indexing.common.tasklogs.ConsoleLoggingEnforcementConfigurationFactory
 - Clearing all configured appenders for logger root. Using 
_Injected_Console_Appender_ instead.
   12:34:12.743 [main] WARN  
org.apache.druid.indexing.common.tasklogs.ConsoleLoggingEnforcementConfigurationFactory
 - Clearing all configured appenders for logger root. Using 
_Injected_Console_Appender_ instead.
   2024-08-12T12:34:13,312 INFO [main] 
org.hibernate.validator.internal.util.Version - HV000001: Hibernate Validator 
6.2.5.Final
   2024-08-12T12:34:14,710 INFO [main] org.apache.druid.guice.ExtensionsLoader 
- Loading extension [druid-avro-extensions], jars: 
schema-repo-common-0.1.3.jar, druid-avro-extensions-28.0.1.jar, 
velocity-engine-core-2.3.jar, xz-1.9.jar, avro-1.11.1.jar, guava-31.1-jre.jar, 
jersey-client-1.19.4.jar, failureaccess-1.0.1.jar, hadoop-client-api-3.3.6.jar, 
jackson-dataformat-yaml-2.12.7.jar, jsr305-2.0.1.jar, slf4j-api-1.7.36.jar, 
snakeyaml-1.33.jar, snappy-java-1.1.10.4.jar, swagger-core-1.6.2.jar, 
schema-repo-client-0.1.3.jar, common-config-5.5.12.jar, 
javax.annotation-api-1.3.2.jar, schema-repo-avro-0.1.3.jar, 
swagger-models-1.6.2.jar, checker-qual-3.12.0.jar, gson-2.3.1.jar, 
schema-repo-api-0.1.3.jar, kafka-clients-5.5.12-ccs.jar, 
j2objc-annotations-1.3.jar, lz4-java-1.8.0.jar, 
error_prone_annotations-2.20.0.jar, avro-mapred-1.11.1.jar, 
kafka-schema-registry-client-5.5.12.jar, avro-ipc-jetty-1.11.1.jar, 
avro-ipc-1.11.1.jar, zstd-jni-1.5.2-3.jar, commons-lang3-3.12.0.jar, 
common-utils-5
 .5.12.jar, swagger-annotations-1.6.2.jar
   2024-08-12T12:34:14,732 INFO [main] org.apache.druid.guice.ExtensionsLoader 
- Loading extension [druid-s3-extensions], jars: jmespath-java-1.12.497.jar, 
aws-java-sdk-sts-1.12.497.jar, commons-logging-1.1.1.jar, 
aws-java-sdk-core-1.12.497.jar, httpcore-4.4.11.jar, ion-java-1.0.2.jar, 
druid-s3-extensions-28.0.1.jar, joda-time-2.12.5.jar, commons-codec-1.13.jar, 
jackson-dataformat-cbor-2.12.7.jar, httpclient-4.5.13.jar
   2024-08-12T12:34:14,740 INFO [main] org.apache.druid.guice.ExtensionsLoader 
- Loading extension [druid-hdfs-storage], jars: jmespath-java-1.12.497.jar, 
aws-java-sdk-s3-1.12.497.jar, wildfly-openssl-1.1.3.Final.jar, 
commons-logging-1.1.1.jar, hadoop-client-api-3.3.6.jar, jsr305-2.0.1.jar, 
slf4j-api-1.7.36.jar, snappy-java-1.1.10.4.jar, aws-java-sdk-core-1.12.497.jar, 
httpcore-4.4.11.jar, ion-java-1.0.2.jar, druid-hdfs-storage-28.0.1.jar, 
joda-time-2.12.5.jar, commons-codec-1.13.jar, hadoop-client-runtime-3.3.6.jar, 
jackson-dataformat-cbor-2.12.7.jar, httpclient-4.5.13.jar, 
hadoop-aws-3.3.6.jar, aws-java-sdk-kms-1.12.497.jar
   2024-08-12T12:34:14,762 INFO [main] org.apache.druid.guice.ExtensionsLoader 
- Loading extension [druid-kafka-indexing-service], jars: 
druid-kafka-indexing-service-28.0.1.jar, snappy-java-1.1.10.4.jar, 
lz4-java-1.8.0.jar, zstd-jni-1.5.2-3.jar, kafka-clients-3.5.1.jar
   2024-08-12T12:34:14,765 INFO [main] org.apache.druid.guice.ExtensionsLoader 
- Loading extension [druid-datasketches], jars: druid-datasketches-28.0.1.jar, 
commons-math3-3.6.1.jar
   2024-08-12T12:34:14,772 INFO [main] org.apache.druid.guice.ExtensionsLoader 
- Loading extension [druid-kubernetes-extensions], jars: 
commons-collections4-4.2.jar, checker-qual-2.10.0.jar, joda-convert-2.2.1.jar, 
bcpkix-jdk15on-1.70.jar, gson-2.8.6.jar, okhttp-3.14.9.jar, jose4j-0.7.3.jar, 
caffeine-2.8.0.jar, client-java-extended-11.0.4.jar, 
simpleclient_httpserver-0.9.0.jar, commons-io-2.11.0.jar, jsr305-2.0.1.jar, 
slf4j-api-1.7.36.jar, snakeyaml-1.33.jar, gson-fire-1.8.5.jar, okio-1.17.2.jar, 
zjsonpatch-0.4.11.jar, bcprov-ext-jdk15on-1.70.jar, 
javax.annotation-api-1.3.2.jar, bcprov-jdk15on-1.70.jar, 
druid-kubernetes-extensions-28.0.1.jar, joda-time-2.12.5.jar, 
bcutil-jdk15on-1.70.jar, commons-codec-1.13.jar, simpleclient-0.9.0.jar, 
error_prone_annotations-2.20.0.jar, client-java-api-11.0.4.jar, 
client-java-11.0.4.jar, protobuf-java-3.24.0.jar, client-java-proto-11.0.4.jar, 
logging-interceptor-3.14.9.jar, bucket4j-core-4.10.0.jar, 
commons-lang3-3.12.0.jar, swagger-annotations-1.6.
 2.jar, simpleclient_common-0.9.0.jar
   2024-08-12T12:34:14,808 INFO [main] org.apache.druid.guice.ExtensionsLoader 
- Loading extension [druid-protobuf-extensions], jars: 
druid-protobuf-extensions-28.0.1.jar
   2024-08-12T12:34:15,534 INFO [main] org.apache.druid.guice.ExtensionsLoader 
- Loading extension [druid-avro-extensions], jars: 
schema-repo-common-0.1.3.jar, druid-avro-extensions-28.0.1.jar, 
velocity-engine-core-2.3.jar, xz-1.9.jar, avro-1.11.1.jar, guava-31.1-jre.jar, 
jersey-client-1.19.4.jar, failureaccess-1.0.1.jar, hadoop-client-api-3.3.6.jar, 
jackson-dataformat-yaml-2.12.7.jar, jsr305-2.0.1.jar, slf4j-api-1.7.36.jar, 
snakeyaml-1.33.jar, snappy-java-1.1.10.4.jar, swagger-core-1.6.2.jar, 
schema-repo-client-0.1.3.jar, common-config-5.5.12.jar, 
javax.annotation-api-1.3.2.jar, schema-repo-avro-0.1.3.jar, 
swagger-models-1.6.2.jar, checker-qual-3.12.0.jar, gson-2.3.1.jar, 
schema-repo-api-0.1.3.jar, kafka-clients-5.5.12-ccs.jar, 
j2objc-annotations-1.3.jar, lz4-java-1.8.0.jar, 
error_prone_annotations-2.20.0.jar, avro-mapred-1.11.1.jar, 
kafka-schema-registry-client-5.5.12.jar, avro-ipc-jetty-1.11.1.jar, 
avro-ipc-1.11.1.jar, zstd-jni-1.5.2-3.jar, commons-lang3-3.12.0.jar, 
common-utils-5
 .5.12.jar, swagger-annotations-1.6.2.jar
   2024-08-12T12:34:15,541 INFO [main] org.apache.druid.guice.ExtensionsLoader 
- Loading extension [druid-s3-extensions], jars: jmespath-java-1.12.497.jar, 
aws-java-sdk-sts-1.12.497.jar, commons-logging-1.1.1.jar, 
aws-java-sdk-core-1.12.497.jar, httpcore-4.4.11.jar, ion-java-1.0.2.jar, 
druid-s3-extensions-28.0.1.jar, joda-time-2.12.5.jar, commons-codec-1.13.jar, 
jackson-dataformat-cbor-2.12.7.jar, httpclient-4.5.13.jar
   2024-08-12T12:34:15,547 INFO [main] org.apache.druid.guice.ExtensionsLoader 
- Loading extension [druid-hdfs-storage], jars: jmespath-java-1.12.497.jar, 
aws-java-sdk-s3-1.12.497.jar, wildfly-openssl-1.1.3.Final.jar, 
commons-logging-1.1.1.jar, hadoop-client-api-3.3.6.jar, jsr305-2.0.1.jar, 
slf4j-api-1.7.36.jar, snappy-java-1.1.10.4.jar, aws-java-sdk-core-1.12.497.jar, 
httpcore-4.4.11.jar, ion-java-1.0.2.jar, druid-hdfs-storage-28.0.1.jar, 
joda-time-2.12.5.jar, commons-codec-1.13.jar, hadoop-client-runtime-3.3.6.jar, 
jackson-dataformat-cbor-2.12.7.jar, httpclient-4.5.13.jar, 
hadoop-aws-3.3.6.jar, aws-java-sdk-kms-1.12.497.jar
   2024-08-12T12:34:15,554 INFO [main] org.apache.druid.guice.ExtensionsLoader 
- Loading extension [druid-kafka-indexing-service], jars: 
druid-kafka-indexing-service-28.0.1.jar, snappy-java-1.1.10.4.jar, 
lz4-java-1.8.0.jar, zstd-jni-1.5.2-3.jar, kafka-clients-3.5.1.jar
   2024-08-12T12:34:15,564 INFO [main] org.apache.druid.guice.ExtensionsLoader 
- Loading extension [druid-datasketches], jars: druid-datasketches-28.0.1.jar, 
commons-math3-3.6.1.jar
   2024-08-12T12:34:15,588 INFO [main] org.apache.druid.guice.ExtensionsLoader 
- Loading extension [druid-kubernetes-extensions], jars: 
commons-collections4-4.2.jar, checker-qual-2.10.0.jar, joda-convert-2.2.1.jar, 
bcpkix-jdk15on-1.70.jar, gson-2.8.6.jar, okhttp-3.14.9.jar, jose4j-0.7.3.jar, 
caffeine-2.8.0.jar, client-java-extended-11.0.4.jar, 
simpleclient_httpserver-0.9.0.jar, commons-io-2.11.0.jar, jsr305-2.0.1.jar, 
slf4j-api-1.7.36.jar, snakeyaml-1.33.jar, gson-fire-1.8.5.jar, okio-1.17.2.jar, 
zjsonpatch-0.4.11.jar, bcprov-ext-jdk15on-1.70.jar, 
javax.annotation-api-1.3.2.jar, bcprov-jdk15on-1.70.jar, 
druid-kubernetes-extensions-28.0.1.jar, joda-time-2.12.5.jar, 
bcutil-jdk15on-1.70.jar, commons-codec-1.13.jar, simpleclient-0.9.0.jar, 
error_prone_annotations-2.20.0.jar, client-java-api-11.0.4.jar, 
client-java-11.0.4.jar, protobuf-java-3.24.0.jar, client-java-proto-11.0.4.jar, 
logging-interceptor-3.14.9.jar, bucket4j-core-4.10.0.jar, 
commons-lang3-3.12.0.jar, swagger-annotations-1.6.
 2.jar, simpleclient_common-0.9.0.jar
   2024-08-12T12:34:15,603 INFO [main] org.apache.druid.guice.ExtensionsLoader 
- Loading extension [druid-protobuf-extensions], jars: 
druid-protobuf-extensions-28.0.1.jar
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to