#general


@sahoo.skumar: @sahoo.skumar has joined the channel

#random


@sahoo.skumar: @sahoo.skumar has joined the channel

#troubleshooting


@sahoo.skumar: @sahoo.skumar has joined the channel

#getting-started


@mohit.asingh: @mohit.asingh has joined the channel
@mohit.asingh: Hello Everyone.. i am trying to inject data from kafka topic to apache pinot but i didn't see any data loaded do i am missing anything in config related to avro ? *Schema* ```{ "schemaName": "test_schema", "dimensionFieldSpecs": [ { "name": "client_id", "dataType": "STRING" }, { "name": "master_property_id", "dataType": "INT" }, { "name": "business_unit", "dataType": "STRING" }, { "name": "error_info_str", "dataType": "STRING" } ], "dateTimeFieldSpecs": [ { "name": "timestamp", "dataType": "LONG", "format": "1:MILLISECONDS:EPOCH", "granularity": "1:MILLISECONDS" } ] }``` Table: ```{ "REALTIME": { "tableName": "test_schema_REALTIME", "tableType": "REALTIME", "segmentsConfig": { "schemaName": "test_schema", "replication": "1", "replicasPerPartition": "1", "timeColumnName": "timestamp" }, "tenants": { "broker": "DefaultTenant", "server": "DefaultTenant", "tagOverrideConfig": {} }, "tableIndexConfig": { "bloomFilterColumns": [], "noDictionaryColumns": [], "onHeapDictionaryColumns": [], "varLengthDictionaryColumns": [], "enableDefaultStarTree": false, "enableDynamicStarTreeCreation": false, "aggregateMetrics": false, "nullHandlingEnabled": false, "invertedIndexColumns": [], "rangeIndexColumns": [], "autoGeneratedInvertedIndex": false, "createInvertedIndexDuringSegmentGeneration": false, "sortedColumn": [], "loadMode": "MMAP", "streamConfigs": { "streamType": "kafka", "stream.kafka.topic.name": "TestTopic", "stream.kafka.broker.list": "localhost:9092", "stream.kafka.consumer.type": "lowlevel", "stream.kafka.consumer.prop.auto.offset.reset": "smallest", "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder", "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory", "schema.registry.url": "", "realtime.segment.flush.threshold.rows": "0", "realtime.segment.flush.threshold.time": "24h", "realtime.segment.flush.segment.size": "100M" } }, "metadata": {}, "quota": {}, "routing": {}, "query": {}, "ingestionConfig": { "transformConfigs": [ { "columnName": "error_info_str", "transformFunction": "json_format(error_info)" } ] }, "isDimTable": false } }``` Kafka Avro Schema: ``` { "type": "record", "name": "TestRecord", "namespace": "com.test.ns", "fields": [ {"name": "client_id", "type": ["null","string"]}, {"name": "master_property_id", "type": "int"}, {"name": "business_unit", "type": ["null","string"]}, {"name": "error_info", "type": { "type": "record", "name": "ErrorInfo", "fields": [ {"name": "code", "type": ["null","string"]}, {"name": "description", "type": ["null","string"]} ] } }, {"name": "timestamp", "type": ["null","long"], "default": null} ] }```
  @mohit.asingh: ```Caught exception while decoding row, discarding row. Payload is9899�$UNIT-1621781307861E-12345,Some error description�����^ java.io.CharConversionException: Invalid UTF-32 character 0x2010839 (above 0x0010ffff) at char #1, byte #7) at shaded.com.fasterxml.jackson.core.io.UTF32Reader.reportInvalid(UTF32Reader.java:195) ~[pinot-all-0.7.1-jar-with-dependencies.jar:0.7.1-e22be7c3a39e840321d3658e7505f21768b228d6] at shaded.com.fasterxml.jackson.core.io.UTF32Reader.read(UTF32Reader.java:158) ~[pinot-all-0.7.1-jar-with-dependencies.jar:0.7.1-e22be7c3a39e840321d3658e7505f21768b228d6] at shaded.com.fasterxml.jackson.core.json.ReaderBasedJsonParser._loadMore(ReaderBasedJsonParser.java:258) ~[pinot-all-0.7.1-jar-with-dependencies.jar:0.7.1-e22be7c3a39e840321d3658e7505f21768b228d6] at shaded.com.fasterxml.jackson.core.json.ReaderBasedJsonParser._skipWSOrEnd(ReaderBasedJsonParser.java:2353) ~[pinot-all-0.7.1-jar-with-dependencies.jar:0.7.1-e22be7c3a39e840321d3658e7505f21768b228d6]``` i saw this error in server looks like some problem to decode message from kafka topic.
  @g.kishore: is there anything else in the log?
  @mohit.asingh: its working now i missed one configuration `"stream.kafka.decoder.prop.schema.registry.rest.url":`
  @g.kishore: Cool
  @g.kishore: Can you make a fix to decoder to warn about missing properties?
  @mohit.asingh: @g.kishore just a quick point correct me if i am wrong.. we have to add one timestamp field in schema that when creating realtime table that timestamp will be use by retention of data etc..? in my case lets say i don't have any timestamp in kafka avro schema but i added a field as `timestamp` in apache pinot schema will it pick a default value automatically?
  @g.kishore: No.. you can probably use a udf to set it to now()
  @mohit.asingh: i was testing the same scenario and realised it take some negative value not sure what it indicate in time of timestamp `-9223372036854776000` now() do i need to use this in transform function?
  @g.kishore: Yes
  @mohit.asingh: its throwing error if i add now() in transform function for timestamp. Invalid table config: test_schema_now_REALTIME. Invalid transform function `'now()'` for column '`timestamp`'

#feat-partial-upsert


@jackie.jxt: @qiaochu Can you try rebuild the project and see if the problem still exist?
@jackie.jxt: It should not have problem because it is part of the CI test on github
--------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@pinot.apache.org For additional commands, e-mail: dev-h...@pinot.apache.org

Reply via email to