als-sdin opened a new issue #9386: Kafka Indexing Service failed - could not 
allocate segment for row with timestamp[*]
URL: https://github.com/apache/druid/issues/9386
 
 
   ### Affected Version
   
   0.16.0
   
   ### Description
   
   Small cluster ingesting data using Kafka Indexing Service. Was ingesting 
fine. A task failed and every subsequent restarted task failed reporting the 
same error. I had to reset the supervisor and skip some messages to get it to 
continue ingesting - resulting in a little data loss.
   
   Note that the row timestamp is a few months old (2019-12-11T00:00:01.000Z). 
Note that there are segments on the historical and in deepstorage that cover 
that time period.
   
   Digging through the error message and druid repository - the exception has 
been thrown because the segmentIdentifer for that row is null??? 
[Link](https://github.com/apache/druid/blob/30c24df4d33197ec4b0d37f0ec36bc2e64dce56f/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java#L707)
 to offending line of code.
   
   I think the preferred way of handling this would be to log the data to a 
file and continue, rather than stop ingesting altogether. Do you agree?
   
   Below is the task report (formatted slightly for readability):
   
   ```json
   {
       "ingestionState": "BUILD_SEGMENTS",
       "unparseableEvents": {},
       "rowStats": {
         "buildSegments": {
           "processed": 2945168,
           "processedWithError": 0,
           "thrownAway": 0,
           "unparseable": 0
         }
       },
       "errorMsg": "
   org.apache.druid.java.util.common.ISE: Could not allocate segment for row 
with timestamp[2019-12-11T00:00:01.000Z]
   at 
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.runInternal(SeekableStreamIndexTaskRunner.java:651)
   at 
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner.run(SeekableStreamIndexTaskRunner.java:259)
   at 
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTask.run(SeekableStreamIndexTask.java:177)
   at 
org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner$SingleTaskBackgroundRunnerCallable.call(SingleTaskBackgroundRunner.java:419)
   at 
org.apache.druid.indexing.overlord.SingleTaskBackgroundRunner$SingleTaskBackgroundRunnerCallable.call(SingleTaskBackgroundRunner.java:391)
   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
   at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
   at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   at java.lang.Thread.run(Thread.java:748)"
   }
   ```
   
   Below is the task payload:
   
   ```json
   {
       "type": "index_kafka",
       "id": "index_kafka_DataProd_ac63ef887e4e88d_behocdcc",
       "resource": {
         "availabilityGroup": "index_kafka_DataProd_ac63ef887e4e88d",
         "requiredCapacity": 1
       },
       "dataSchema": {
         "dataSource": "DataProd",
         "parser": {
           "type": "string",
           "parseSpec": {
             "format": "json",
             "timestampSpec": {
               "column": "ts",
               "format": "iso"
             },
             "dimensionsSpec": {
               "dimensions": [
                 {
                   "name": "control_group",
                   "type": "string"
                 },
                 {
                   "name": "country",
                   "type": "string"
                 },
                 {
                   "name": "device_name",
                   "type": "string"
                 },
                 {
                   "name": "metric_name",
                   "type": "string"
                 }
               ]
             }
           }
         },
         "metricsSpec": [
           {
             "type": "count",
             "name": "count"
           },
           {
             "type": "doubleSum",
             "name": "sum",
             "fieldName": "metric_value",
             "expression": null
           },
           {
             "type": "doubleMin",
             "name": "min",
             "fieldName": "metric_value",
             "expression": null
           },
           {
             "type": "doubleMax",
             "name": "max",
             "fieldName": "metric_value",
             "expression": null
           }
         ],
         "granularitySpec": {
           "type": "uniform",
           "segmentGranularity": "DAY",
           "queryGranularity": "MINUTE",
           "rollup": true,
           "intervals": null
         },
         "transformSpec": {
           "filter": null,
           "transforms": []
         }
       },
       "tuningConfig": {
         "type": "KafkaTuningConfig",
         "maxRowsInMemory": 1000000,
         "maxBytesInMemory": 0,
         "maxRowsPerSegment": 5000000,
         "maxTotalRows": 20000000,
         "intermediatePersistPeriod": "PT10M",
         "basePersistDirectory": "/var/tmp/druid/1580855764252-0",
         "maxPendingPersists": 0,
         "indexSpec": {
           "bitmap": {
             "type": "concise"
           },
           "dimensionCompression": "lz4",
           "metricCompression": "lz4",
           "longEncoding": "longs"
         },
         "indexSpecForIntermediatePersists": {
           "bitmap": {
             "type": "concise"
           },
           "dimensionCompression": "lz4",
           "metricCompression": "lz4",
           "longEncoding": "longs"
         },
         "buildV9Directly": true,
         "reportParseExceptions": false,
         "handoffConditionTimeout": 0,
         "resetOffsetAutomatically": false,
         "segmentWriteOutMediumFactory": null,
         "intermediateHandoffPeriod": "P2147483647D",
         "logParseExceptions": false,
         "maxParseExceptions": 2147483647,
         "maxSavedParseExceptions": 0,
         "skipSequenceNumberAvailabilityCheck": false
       },
       "ioConfig": {
         "type": "kafka",
         "taskGroupId": 0,
         "baseSequenceName": "index_kafka_DataProd_ac63ef887e4e88d",
         "startPartitions": {
           "type": "end",
           "stream": "switchdin_rollups",
           "topic": "switchdin_rollups",
           "partitionSequenceNumberMap": {
             "0": 419705763,
             "1": 247769423,
             "2": 395630862,
             "3": 555085414
           },
           "partitionOffsetMap": {
             "0": 419705763,
             "1": 247769423,
             "2": 395630862,
             "3": 555085414
           }
         },
         "endPartitions": {
           "type": "end",
           "stream": "switchdin_rollups",
           "topic": "switchdin_rollups",
           "partitionSequenceNumberMap": {
             "0": 9223372036854776000,
             "1": 9223372036854776000,
             "2": 9223372036854776000,
             "3": 9223372036854776000
           },
           "partitionOffsetMap": {
             "0": 9223372036854776000,
             "1": 9223372036854776000,
             "2": 9223372036854776000,
             "3": 9223372036854776000
           }
         },
         "startSequenceNumbers": {
           "type": "start",
           "stream": "switchdin_rollups",
           "topic": "switchdin_rollups",
           "partitionSequenceNumberMap": {
             "0": 419705763,
             "1": 247769423,
             "2": 395630862,
             "3": 555085414
           },
           "partitionOffsetMap": {
             "0": 419705763,
             "1": 247769423,
             "2": 395630862,
             "3": 555085414
           },
           "exclusivePartitions": []
         },
         "endSequenceNumbers": {
           "type": "end",
           "stream": "switchdin_rollups",
           "topic": "switchdin_rollups",
           "partitionSequenceNumberMap": {
             "0": 9223372036854776000,
             "1": 9223372036854776000,
             "2": 9223372036854776000,
             "3": 9223372036854776000
           },
           "partitionOffsetMap": {
             "0": 9223372036854776000,
             "1": 9223372036854776000,
             "2": 9223372036854776000,
             "3": 9223372036854776000
           }
         },
         "consumerProperties": {
           "bootstrap.servers": "kafka.vpca.switchd.in:9092"
         },
         "pollTimeout": 100,
         "useTransaction": true,
         "minimumMessageTime": null,
         "maximumMessageTime": null
       },
       "context": {
         "forceTimeChunkLock": true,
         "checkpoints": 
"{\"0\":{\"0\":419705763,\"1\":247769423,\"2\":395630862,\"3\":555085414}}",
         "IS_INCREMENTAL_HANDOFF_SUPPORTED": true
       },
       "groupId": "index_kafka_DataProd",
       "dataSource": "DataProd"
     }
   ```
   
   Note I am also seeing this warning message from the coordinator:
   
   ```
   WARN [KafkaSupervisor-DataProd-Reporting-0] 
org.apache.druid.indexing.kafka.supervisor.KafkaSupervisor - Lag metric: Kafka 
partitions [0, 1, 2, 3] do not match task partitions []
   ```
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to