minwoo-jung opened a new issue #7100:
URL: https://github.com/apache/incubator-pinot/issues/7100
Hello~
I am using pinot well. Thank you for making a great product. :)
I have a question because it didn't work as I thought while using it.
**Even if I recreate the kafka topic or modify the topic properties, I
wonder how consuming can continue to do it.**
I am storing data in Stream ingestion way.
for some reason I need to delete the kafka topic and recreate it. The
consuming segment seems to have stopped.
That is, no more data is stored.
I've tested it several times with the same scenario, but the consuming
segment still stops and no data is saved.
So, I tried several methods to solve this problem, and among various
attempts,
1 When I disable the data table
2 delete and recreate the kafka topic
3 enable the data table
sometimes the consuming segment recovers its operation.
However, it does not always work normally.
Also, I tried reload segment after comsuming segment stopped, but it still
didn't work.
In addition, I tried various methods, but consuming stopped as it is.
My guess is that when the kafka topic is recreated, the data offset is
changed and this is what happens.
In my opinion, if the offset is well reset within the pinot consumer, even
if the kafka topic is recreated, it is normal when the comsuming segment
continues to accumulate data well.
**Even if I recreate the kafka topic or modify the properties, I wonder how
consuming can continue to do it.**
I don't know the internal logic well, but I looked at the
org.apache.pinot.core.realtime.impl.kafka2.KafkaConsumerFactory,
KafkaPartitionLevelConsumer, KafkaStreamLevelConsumer class codes, but couldn't
find any problem.
If the consuming segment stops, re-creating the table may be a way,
but since the previously stored data is lost, I am looking for a way to keep
the comsuming segment operating normally and not lose data without re-creating
the table.
Note that
The docker image version currently used is as follows.
```
{
"pinot-protobuf":
"0.8.0-SNAPSHOT-46009e152b8f56c244e415beefa81dbc626de7cb",
"pinot-kafka-2.0":
"0.8.0-SNAPSHOT-46009e152b8f56c244e415beefa81dbc626de7cb",
"pinot-avro": "0.8.0-SNAPSHOT-46009e152b8f56c244e415beefa81dbc626de7cb",
"pinot-distribution":
"0.8.0-SNAPSHOT-46009e152b8f56c244e415beefa81dbc626de7cb",
"pinot-csv": "0.8.0-SNAPSHOT-46009e152b8f56c244e415beefa81dbc626de7cb",
"pinot-s3": "0.8.0-SNAPSHOT-46009e152b8f56c244e415beefa81dbc626de7cb",
"pinot-yammer": "0.8.0-SNAPSHOT-46009e152b8f56c244e415beefa81dbc626de7cb",
"pinot-segment-uploader-default":
"0.8.0-SNAPSHOT-46009e152b8f56c244e415beefa81dbc626de7cb",
"pinot-batch-ingestion-standalone":
"0.8.0-SNAPSHOT-46009e152b8f56c244e415beefa81dbc626de7cb",
"pinot-confluent-avro":
"0.8.0-SNAPSHOT-46009e152b8f56c244e415beefa81dbc626de7cb",
"pinot-thrift": "0.8.0-SNAPSHOT-46009e152b8f56c244e415beefa81dbc626de7cb",
"pinot-orc": "0.8.0-SNAPSHOT-46009e152b8f56c244e415beefa81dbc626de7cb",
"pinot-batch-ingestion-spark":
"0.8.0-SNAPSHOT-46009e152b8f56c244e415beefa81dbc626de7cb",
"pinot-azure": "0.8.0-SNAPSHOT-46009e152b8f56c244e415beefa81dbc626de7cb",
"pinot-gcs": "0.8.0-SNAPSHOT-46009e152b8f56c244e415beefa81dbc626de7cb",
"pinot-batch-ingestion-hadoop":
"0.8.0-SNAPSHOT-46009e152b8f56c244e415beefa81dbc626de7cb",
"pinot-hdfs": "0.8.0-SNAPSHOT-46009e152b8f56c244e415beefa81dbc626de7cb",
"pinot-adls": "0.8.0-SNAPSHOT-46009e152b8f56c244e415beefa81dbc626de7cb",
"pinot-kinesis": "0.8.0-SNAPSHOT-46009e152b8f56c244e415beefa81dbc626de7cb",
"pinot-json": "0.8.0-SNAPSHOT-46009e152b8f56c244e415beefa81dbc626de7cb",
"pinot-minion-builtin-tasks":
"0.8.0-SNAPSHOT-46009e152b8f56c244e415beefa81dbc626de7cb",
"pinot-parquet": "0.8.0-SNAPSHOT-46009e152b8f56c244e415beefa81dbc626de7cb",
"pinot-segment-writer-file-based":
"0.8.0-SNAPSHOT-46009e152b8f56c244e415beefa81dbc626de7cb"
}
```
table confg
```
{
"REALTIME": {
"tableName": "systemMetricLong_REALTIME",
"tableType": "REALTIME",
"segmentsConfig": {
"timeType": "MILLISECONDS",
"schemaName": "systemMetricLong",
"retentionTimeUnit": "DAYS",
"retentionTimeValue": "2",
"timeColumnName": "timestampInEpoch",
"replicasPerPartition": "1"
},
"tenants": {
"broker": "DefaultTenant",
"server": "DefaultTenant"
},
"tableIndexConfig": {
"loadMode": "MMAP",
"sortedColumn": [
"applicationName"
],
"autoGeneratedInvertedIndex": false,
"createInvertedIndexDuringSegmentGeneration": false,
"streamConfigs": {
"streamType": "kafka",
"stream.kafka.consumer.type": "lowlevel",
"stream.kafka.topic.name": "system-metric-long",
"stream.kafka.decoder.class.name":
"org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
"stream.kafka.consumer.factory.class.name":
"org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.broker.list": XXXXXXX
"realtime.segment.flush.threshold.raws": "0",
"realtime.segment.flush.threshold.time": "24h",
"realtime.segment.flush.threshold.segment.size": "50M",
"stream.kafka.consumer.prop.auto.offset.reset": "smallest"
},
"invertedIndexColumns": [
"tags"
],
"rangeIndexColumns": [
"timestampInEpoch"
],
"aggregateMetrics": false,
"nullHandlingEnabled": true,
"enableDefaultStarTree": false,
"enableDynamicStarTreeCreation": false
},
"metadata": {
"customConfigs": {}
},
"isDimTable": false
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]