This is an automated email from the ASF dual-hosted git repository.

abhishek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 379df5f103 Kinesis docs and logs improvements (#12886)
379df5f103 is described below

commit 379df5f103ca6d2d0049a53b674078e69713aacf
Author: AmatyaAvadhanula <[email protected]>
AuthorDate: Mon Aug 22 14:49:42 2022 +0530

    Kinesis docs and logs improvements (#12886)
    
    Going ahead with the merge. CI is failing because of a code coverage change 
in the log line.
---
 docs/development/extensions-core/kinesis-ingestion.md              | 2 --
 .../org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java   | 7 +++++--
 2 files changed, 5 insertions(+), 4 deletions(-)

diff --git a/docs/development/extensions-core/kinesis-ingestion.md 
b/docs/development/extensions-core/kinesis-ingestion.md
index a22c33bcfd..916ea911d3 100644
--- a/docs/development/extensions-core/kinesis-ingestion.md
+++ b/docs/development/extensions-core/kinesis-ingestion.md
@@ -649,7 +649,5 @@ Before you deploy the Kinesis extension to production, 
consider the following kn
 
 - Avoid implementing more than one Kinesis supervisor that read from the same 
Kinesis stream for ingestion. Kinesis has a per-shard read throughput limit and 
having multiple supervisors on the same stream can reduce available read 
throughput for an individual Supervisor's tasks. Additionally, multiple 
Supervisors ingesting to the same Druid Datasource can cause increased 
contention for locks on the Datasource.
 - The only way to change the stream reset policy is to submit a new ingestion 
spec and set up a new supervisor.
-- Timeouts for retrieving earliest sequence number will cause a reset of the 
supervisor. The job will resume own its own eventually, but it can trigger 
alerts.
-- The Kinesis supervisor will not make progress if you have empty shards. Make 
sure you have at least 1 record in the shard.
 - If ingestion tasks get stuck, the supervisor does not automatically recover. 
You should monitor ingestion tasks and investigate if your ingestion falls 
behind.
 - A Kinesis supervisor can sometimes compare the checkpoint offset to 
retention window of the stream to see if it has fallen behind. These checks 
fetch the earliest sequence number for Kinesis which can result in 
`IteratorAgeMilliseconds` becoming very high in AWS CloudWatch.
diff --git 
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
index be3fe419ab..f4a1f074fa 100644
--- 
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
+++ 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisRecordSupplier.java
@@ -228,7 +228,9 @@ public class KinesisRecordSupplier implements 
RecordSupplier<String, String, Byt
             recordsResult = null;
 
             if (!records.offer(currRecord, recordBufferOfferTimeout, 
TimeUnit.MILLISECONDS)) {
-              log.warn("OrderedPartitionableRecord buffer full, retrying in 
[%,dms]", recordBufferFullWait);
+              log.warn("Kinesis records are being processed slower than they 
are fetched. "
+                       + "OrderedPartitionableRecord buffer full, retrying in 
[%,dms].",
+                       recordBufferFullWait);
               scheduleBackgroundFetch(recordBufferFullWait);
             }
 
@@ -293,7 +295,8 @@ public class KinesisRecordSupplier implements 
RecordSupplier<String, String, Byt
             // from this message and back off for a bit to let the buffer 
drain before retrying.
             if (!records.offer(currRecord, recordBufferOfferTimeout, 
TimeUnit.MILLISECONDS)) {
               log.warn(
-                  "OrderedPartitionableRecord buffer full, storing iterator 
and retrying in [%,dms]",
+                  "Kinesis records are being processed slower than they are 
fetched. "
+                  + "OrderedPartitionableRecord buffer full, storing iterator 
and retrying in [%,dms].",
                   recordBufferFullWait
               );
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to