Ycallaer opened a new issue, #16881: URL: https://github.com/apache/druid/issues/16881
Hi, As part of a POC we are trying to ingest kafka protobuf messages, the schema is stored in schema registry. We are following these 2 blogs: - https://blog.hellmar-becker.de/2022/05/26/ingesting-protobuf-messages-into-apache-druid/ - https://druid.apache.org/docs/latest/development/extensions-core/protobuf/#when-using-schema-registry Since this is a POC, no security is present on the kafka cluster ( we are using Confluent CFK, the demo repo that they made available). To run druid we are using https://github.com/datainfrahq/druid-operator. The ingestion configuration we are submitting from the UI is the following ``` { "type": "kafka", "spec": { "dataSchema": { "dataSource": "etf_dummy_data", "timestampSpec": { "column": "timestamp", "format": "auto" }, "dimensionsSpec": { "dimensions": [ "unit", "http_method", "http_code", "page", "metricType", "server" ], "dimensionExclusions": [ "timestamp", "value" ] }, "metricsSpec": [ { "name": "count", "type": "count" }, { "name": "value_sum", "fieldName": "value", "type": "doubleSum" }, { "name": "value_min", "fieldName": "value", "type": "doubleMin" }, { "name": "value_max", "fieldName": "value", "type": "doubleMax" } ], "granularitySpec": { "type": "uniform", "segmentGranularity": "HOUR", "queryGranularity": "NONE" } }, "tuningConfig": { "type": "kafka", "maxRowsPerSegment": 5000000 }, "ioConfig": { "topic": "etf_dummy_data", "consumerProperties": { "bootstrap.servers": "kafka.confluent.svc.cluster.local:9092" }, "inputFormat": { "type": "protobuf", "protoBytesDecoder": { "url": "http://schemaregistry.confluent.svc.cluster.local:8081", "type": "schema_registry", "capacity": 100, }, "flattenSpec": { "useFieldDiscovery": true }, "binaryAsString": false }, "taskCount": 1, "replicas": 1, "taskDuration": "PT1H", "type": "kafka" } } } ``` When this is submitted, the following stacktrace is generated from the UI ``` Failed to submit supervisor: Cannot construct instance of `org.apache.druid.data.input.protobuf.SchemaRegistryBasedProtobufBytesDecoder`, problem: io/confluent/kafka/schemaregistry/protobuf/ProtobufSchemaProvider at [Source: (org.eclipse.jetty.server.HttpInputOverHTTP); line: 1, column: 302] (through reference chain: org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec["spec"] ->org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIngestionSpec["ioConfig"] ->org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig["inputFormat"] ->org.apache.druid.data.input.protobuf.ProtobufInputFormat["protoBytesDecoder"]) ``` If we just provide topic and broker we get a successful connection, but we get binary data. So we know it is not a network / connectivity issue. The moment the schema registry part is added is when we get errors. ### Affected Version I have tested this with the image of Apache druid 25.0.0 and 28.0.1 and 29.0.1 ### Description Please include as much detailed information about the problem as possible. - Cluster size: The cluster definition we are using is the tiny cluster from https://github.com/datainfrahq/druid-operator/blob/master/examples/tiny-cluster.yaml - From the e2e example dir , I modified the file https://github.com/datainfrahq/druid-operator/blob/master/e2e/configs/druid-cr.yaml , and added in the common runtime the extension druid-protobuf-extensions Additionally , through a side container I have added the following JARs ``` - containerName: protobuf runAsInit: true image: apache/druid:28.0.1 command: - "sh" - "-c" - "wget -O /tmp/kafka-protobuf-provider-7.6.0.jar https://packages.confluent.io/maven/io/confluent/kafka-protobuf-provider/7.6.0/kafka-protobuf-provider-7.6.0.jar && wget -O /tmp/kotlin-stdlib-1.9.10.jar https://repo1.maven.org/maven2/org/jetbrains/kotlin/kotlin-stdlib/1.9.10/kotlin-stdlib-1.9.10.jar && wget -O /tmp/wire-schema-4.9.9.jar https://repo1.maven.org/maven2/com/squareup/wire/wire-schema/4.9.9/wire-schema-4.9.9.jar && cp /tmp/*.jar /opt/druid/extensions/protobuf-extensions" #- "wget -O /tmp/kafka-protobuf-provider-6.0.1.jar https://packages.confluent.io/maven/io/confluent/kafka-protobuf-provider/6.0.1/kafka-protobuf-provider-6.0.1.jar && wget -O /tmp/otlin-stdlib-1.4.0.jar https://repo1.maven.org/maven2/org/jetbrains/kotlin/kotlin-stdlib/1.4.0/kotlin-stdlib-1.4.0.jar && wget -O /tmp/wire-schema-3.2.2.jar https://repo1.maven.org/maven2/com/squareup/wire/wire-schema/3.2.2/wire-schema-3.2.2.jar && cp /tmp/*.jar /opt/druid/extensions/protobuf-extensions" volumeMounts: - name: protobufextensions mountPath: "/opt/druid/extensions/protobuf-extensions" ``` If I exec into the pod, I can see that the JARs are present on the system. If we change the keyword `inputFormat` to `parser` (the old syntax way), the job submits, but the configuration part of schema registry dissapears and the job in the end fails stating that `inputFormat` block is missing. I had already reached out in the [slack](https://apachedruidworkspace.slack.com/archives/C0309C9L90D/p1723464457849649?thread_ts=1723442601.550529&cid=C0309C9L90D) channel, and they requested me to log an incident. Additionally, when these jobs get triggered I can see that the following libraries being loaded but not the 3 libs that I add through the side container. Not sure if that is relevant or not. ``` 12:34:12.739 [main] WARN org.apache.druid.indexing.common.tasklogs.ConsoleLoggingEnforcementConfigurationFactory - Clearing all configured appenders for logger root. Using _Injected_Console_Appender_ instead. 12:34:12.743 [main] WARN org.apache.druid.indexing.common.tasklogs.ConsoleLoggingEnforcementConfigurationFactory - Clearing all configured appenders for logger root. Using _Injected_Console_Appender_ instead. 2024-08-12T12:34:13,312 INFO [main] org.hibernate.validator.internal.util.Version - HV000001: Hibernate Validator 6.2.5.Final 2024-08-12T12:34:14,710 INFO [main] org.apache.druid.guice.ExtensionsLoader - Loading extension [druid-avro-extensions], jars: schema-repo-common-0.1.3.jar, druid-avro-extensions-28.0.1.jar, velocity-engine-core-2.3.jar, xz-1.9.jar, avro-1.11.1.jar, guava-31.1-jre.jar, jersey-client-1.19.4.jar, failureaccess-1.0.1.jar, hadoop-client-api-3.3.6.jar, jackson-dataformat-yaml-2.12.7.jar, jsr305-2.0.1.jar, slf4j-api-1.7.36.jar, snakeyaml-1.33.jar, snappy-java-1.1.10.4.jar, swagger-core-1.6.2.jar, schema-repo-client-0.1.3.jar, common-config-5.5.12.jar, javax.annotation-api-1.3.2.jar, schema-repo-avro-0.1.3.jar, swagger-models-1.6.2.jar, checker-qual-3.12.0.jar, gson-2.3.1.jar, schema-repo-api-0.1.3.jar, kafka-clients-5.5.12-ccs.jar, j2objc-annotations-1.3.jar, lz4-java-1.8.0.jar, error_prone_annotations-2.20.0.jar, avro-mapred-1.11.1.jar, kafka-schema-registry-client-5.5.12.jar, avro-ipc-jetty-1.11.1.jar, avro-ipc-1.11.1.jar, zstd-jni-1.5.2-3.jar, commons-lang3-3.12.0.jar, common-utils-5 .5.12.jar, swagger-annotations-1.6.2.jar 2024-08-12T12:34:14,732 INFO [main] org.apache.druid.guice.ExtensionsLoader - Loading extension [druid-s3-extensions], jars: jmespath-java-1.12.497.jar, aws-java-sdk-sts-1.12.497.jar, commons-logging-1.1.1.jar, aws-java-sdk-core-1.12.497.jar, httpcore-4.4.11.jar, ion-java-1.0.2.jar, druid-s3-extensions-28.0.1.jar, joda-time-2.12.5.jar, commons-codec-1.13.jar, jackson-dataformat-cbor-2.12.7.jar, httpclient-4.5.13.jar 2024-08-12T12:34:14,740 INFO [main] org.apache.druid.guice.ExtensionsLoader - Loading extension [druid-hdfs-storage], jars: jmespath-java-1.12.497.jar, aws-java-sdk-s3-1.12.497.jar, wildfly-openssl-1.1.3.Final.jar, commons-logging-1.1.1.jar, hadoop-client-api-3.3.6.jar, jsr305-2.0.1.jar, slf4j-api-1.7.36.jar, snappy-java-1.1.10.4.jar, aws-java-sdk-core-1.12.497.jar, httpcore-4.4.11.jar, ion-java-1.0.2.jar, druid-hdfs-storage-28.0.1.jar, joda-time-2.12.5.jar, commons-codec-1.13.jar, hadoop-client-runtime-3.3.6.jar, jackson-dataformat-cbor-2.12.7.jar, httpclient-4.5.13.jar, hadoop-aws-3.3.6.jar, aws-java-sdk-kms-1.12.497.jar 2024-08-12T12:34:14,762 INFO [main] org.apache.druid.guice.ExtensionsLoader - Loading extension [druid-kafka-indexing-service], jars: druid-kafka-indexing-service-28.0.1.jar, snappy-java-1.1.10.4.jar, lz4-java-1.8.0.jar, zstd-jni-1.5.2-3.jar, kafka-clients-3.5.1.jar 2024-08-12T12:34:14,765 INFO [main] org.apache.druid.guice.ExtensionsLoader - Loading extension [druid-datasketches], jars: druid-datasketches-28.0.1.jar, commons-math3-3.6.1.jar 2024-08-12T12:34:14,772 INFO [main] org.apache.druid.guice.ExtensionsLoader - Loading extension [druid-kubernetes-extensions], jars: commons-collections4-4.2.jar, checker-qual-2.10.0.jar, joda-convert-2.2.1.jar, bcpkix-jdk15on-1.70.jar, gson-2.8.6.jar, okhttp-3.14.9.jar, jose4j-0.7.3.jar, caffeine-2.8.0.jar, client-java-extended-11.0.4.jar, simpleclient_httpserver-0.9.0.jar, commons-io-2.11.0.jar, jsr305-2.0.1.jar, slf4j-api-1.7.36.jar, snakeyaml-1.33.jar, gson-fire-1.8.5.jar, okio-1.17.2.jar, zjsonpatch-0.4.11.jar, bcprov-ext-jdk15on-1.70.jar, javax.annotation-api-1.3.2.jar, bcprov-jdk15on-1.70.jar, druid-kubernetes-extensions-28.0.1.jar, joda-time-2.12.5.jar, bcutil-jdk15on-1.70.jar, commons-codec-1.13.jar, simpleclient-0.9.0.jar, error_prone_annotations-2.20.0.jar, client-java-api-11.0.4.jar, client-java-11.0.4.jar, protobuf-java-3.24.0.jar, client-java-proto-11.0.4.jar, logging-interceptor-3.14.9.jar, bucket4j-core-4.10.0.jar, commons-lang3-3.12.0.jar, swagger-annotations-1.6. 2.jar, simpleclient_common-0.9.0.jar 2024-08-12T12:34:14,808 INFO [main] org.apache.druid.guice.ExtensionsLoader - Loading extension [druid-protobuf-extensions], jars: druid-protobuf-extensions-28.0.1.jar 2024-08-12T12:34:15,534 INFO [main] org.apache.druid.guice.ExtensionsLoader - Loading extension [druid-avro-extensions], jars: schema-repo-common-0.1.3.jar, druid-avro-extensions-28.0.1.jar, velocity-engine-core-2.3.jar, xz-1.9.jar, avro-1.11.1.jar, guava-31.1-jre.jar, jersey-client-1.19.4.jar, failureaccess-1.0.1.jar, hadoop-client-api-3.3.6.jar, jackson-dataformat-yaml-2.12.7.jar, jsr305-2.0.1.jar, slf4j-api-1.7.36.jar, snakeyaml-1.33.jar, snappy-java-1.1.10.4.jar, swagger-core-1.6.2.jar, schema-repo-client-0.1.3.jar, common-config-5.5.12.jar, javax.annotation-api-1.3.2.jar, schema-repo-avro-0.1.3.jar, swagger-models-1.6.2.jar, checker-qual-3.12.0.jar, gson-2.3.1.jar, schema-repo-api-0.1.3.jar, kafka-clients-5.5.12-ccs.jar, j2objc-annotations-1.3.jar, lz4-java-1.8.0.jar, error_prone_annotations-2.20.0.jar, avro-mapred-1.11.1.jar, kafka-schema-registry-client-5.5.12.jar, avro-ipc-jetty-1.11.1.jar, avro-ipc-1.11.1.jar, zstd-jni-1.5.2-3.jar, commons-lang3-3.12.0.jar, common-utils-5 .5.12.jar, swagger-annotations-1.6.2.jar 2024-08-12T12:34:15,541 INFO [main] org.apache.druid.guice.ExtensionsLoader - Loading extension [druid-s3-extensions], jars: jmespath-java-1.12.497.jar, aws-java-sdk-sts-1.12.497.jar, commons-logging-1.1.1.jar, aws-java-sdk-core-1.12.497.jar, httpcore-4.4.11.jar, ion-java-1.0.2.jar, druid-s3-extensions-28.0.1.jar, joda-time-2.12.5.jar, commons-codec-1.13.jar, jackson-dataformat-cbor-2.12.7.jar, httpclient-4.5.13.jar 2024-08-12T12:34:15,547 INFO [main] org.apache.druid.guice.ExtensionsLoader - Loading extension [druid-hdfs-storage], jars: jmespath-java-1.12.497.jar, aws-java-sdk-s3-1.12.497.jar, wildfly-openssl-1.1.3.Final.jar, commons-logging-1.1.1.jar, hadoop-client-api-3.3.6.jar, jsr305-2.0.1.jar, slf4j-api-1.7.36.jar, snappy-java-1.1.10.4.jar, aws-java-sdk-core-1.12.497.jar, httpcore-4.4.11.jar, ion-java-1.0.2.jar, druid-hdfs-storage-28.0.1.jar, joda-time-2.12.5.jar, commons-codec-1.13.jar, hadoop-client-runtime-3.3.6.jar, jackson-dataformat-cbor-2.12.7.jar, httpclient-4.5.13.jar, hadoop-aws-3.3.6.jar, aws-java-sdk-kms-1.12.497.jar 2024-08-12T12:34:15,554 INFO [main] org.apache.druid.guice.ExtensionsLoader - Loading extension [druid-kafka-indexing-service], jars: druid-kafka-indexing-service-28.0.1.jar, snappy-java-1.1.10.4.jar, lz4-java-1.8.0.jar, zstd-jni-1.5.2-3.jar, kafka-clients-3.5.1.jar 2024-08-12T12:34:15,564 INFO [main] org.apache.druid.guice.ExtensionsLoader - Loading extension [druid-datasketches], jars: druid-datasketches-28.0.1.jar, commons-math3-3.6.1.jar 2024-08-12T12:34:15,588 INFO [main] org.apache.druid.guice.ExtensionsLoader - Loading extension [druid-kubernetes-extensions], jars: commons-collections4-4.2.jar, checker-qual-2.10.0.jar, joda-convert-2.2.1.jar, bcpkix-jdk15on-1.70.jar, gson-2.8.6.jar, okhttp-3.14.9.jar, jose4j-0.7.3.jar, caffeine-2.8.0.jar, client-java-extended-11.0.4.jar, simpleclient_httpserver-0.9.0.jar, commons-io-2.11.0.jar, jsr305-2.0.1.jar, slf4j-api-1.7.36.jar, snakeyaml-1.33.jar, gson-fire-1.8.5.jar, okio-1.17.2.jar, zjsonpatch-0.4.11.jar, bcprov-ext-jdk15on-1.70.jar, javax.annotation-api-1.3.2.jar, bcprov-jdk15on-1.70.jar, druid-kubernetes-extensions-28.0.1.jar, joda-time-2.12.5.jar, bcutil-jdk15on-1.70.jar, commons-codec-1.13.jar, simpleclient-0.9.0.jar, error_prone_annotations-2.20.0.jar, client-java-api-11.0.4.jar, client-java-11.0.4.jar, protobuf-java-3.24.0.jar, client-java-proto-11.0.4.jar, logging-interceptor-3.14.9.jar, bucket4j-core-4.10.0.jar, commons-lang3-3.12.0.jar, swagger-annotations-1.6. 2.jar, simpleclient_common-0.9.0.jar 2024-08-12T12:34:15,603 INFO [main] org.apache.druid.guice.ExtensionsLoader - Loading extension [druid-protobuf-extensions], jars: druid-protobuf-extensions-28.0.1.jar ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
