This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 95ca43034f Change default handoffConditionTimeout to 15 minutes. 
(#14539)
95ca43034f is described below

commit 95ca43034fe661556f56dee7c39b6d7b784a0a2e
Author: Gian Merlino <[email protected]>
AuthorDate: Thu Jul 13 13:17:14 2023 -0700

    Change default handoffConditionTimeout to 15 minutes. (#14539)
    
    * Change default handoffConditionTimeout to 15 minutes.
    
    Most of the time, when handoff is taking this long, it's because something
    is preventing Historicals from loading new data. In this case, we have
    two choices:
    
    1) Stop making progress on ingestion, wait for Historicals to load stuff,
       and keep the waiting-for-handoff segments available on realtime tasks.
       (handoffConditionTimeout = 0, the current default)
    
    2) Continue making progress on ingestion, by exiting the realtime tasks
       that were waiting for handoff. Once the Historicals get their act
       together, the segments will be loaded, as they are still there on
       deep storage. They will just not be continuously available.
       (handoffConditionTimeout > 0)
    
    I believe most users would prefer [2], because [1] risks ingestion falling
    behind the stream, which causes many other problems. It can cause data loss
    if the stream ages-out data before we have a chance to ingest it.
    
    Due to the way tuningConfigs are serialized -- defaults are baked into the
    serialized form that is written to the database -- this default change will
    not change anyone's existing supervisors. It will take effect for newly
    created supervisors.
    
    * Fix tests.
    
    * Update docs/development/extensions-core/kafka-supervisor-reference.md
    
    Co-authored-by: Katya Macedo  <[email protected]>
    
    * Update docs/development/extensions-core/kinesis-ingestion.md
    
    Co-authored-by: Katya Macedo  <[email protected]>
    
    ---------
    
    Co-authored-by: Katya Macedo <[email protected]>
---
 docs/development/extensions-core/kafka-supervisor-reference.md      | 2 +-
 docs/development/extensions-core/kinesis-ingestion.md               | 2 +-
 .../apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java | 6 +++---
 .../indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java  | 5 ++---
 .../druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java    | 6 +++---
 .../kinesis/supervisor/KinesisSupervisorTuningConfigTest.java       | 5 ++---
 .../org/apache/druid/segment/indexing/RealtimeTuningConfig.java     | 3 ++-
 .../org/apache/druid/segment/indexing/RealtimeTuningConfigTest.java | 3 ++-
 8 files changed, 16 insertions(+), 16 deletions(-)

diff --git a/docs/development/extensions-core/kafka-supervisor-reference.md 
b/docs/development/extensions-core/kafka-supervisor-reference.md
index eff12d9185..ebde49e3da 100644
--- a/docs/development/extensions-core/kafka-supervisor-reference.md
+++ b/docs/development/extensions-core/kafka-supervisor-reference.md
@@ -204,7 +204,7 @@ The `tuningConfig` is optional and default parameters will 
be used if no `tuning
 | `indexSpec`                       | Object         | Tune how data is 
indexed. See [IndexSpec](#indexspec) for more information.                      
                                                                                
                                                                                
                                                                                
                                                                                
                     [...]
 | `indexSpecForIntermediatePersists`|                | Defines segment storage 
format options to be used at indexing time for intermediate persisted temporary 
segments. This can be used to disable dimension/metric compression on 
intermediate segments to reduce memory required for final merging. However, 
disabling compression on intermediate segments might increase page cache use 
while they are used before getting merged into final segment published, see 
[IndexSpec](#indexspec) for possib [...]
 | `reportParseExceptions`           | Boolean        | *DEPRECATED*. If true, 
exceptions encountered during parsing will be thrown and will halt ingestion; 
if false, unparseable rows and fields will be skipped. Setting 
`reportParseExceptions` to true will override existing configurations for 
`maxParseExceptions` and `maxSavedParseExceptions`, setting 
`maxParseExceptions` to 0 and limiting `maxSavedParseExceptions` to no more 
than 1.                                                         [...]
-| `handoffConditionTimeout`         | Long           | Milliseconds to wait 
for segment handoff. It must be >= 0, where 0 means to wait forever.            
                                                                                
                                                                                
                                                                                
                                                                                
                 [...]
+| `handoffConditionTimeout`         | Long           | Number of milliseconds 
to wait for segment handoff. Set to a value >= 0, where 0 means to wait 
indefinitely.                                                                   
                                                                                
                                                                                
                                                                                
                       [...]
 | `resetOffsetAutomatically`        | Boolean        | Controls behavior when 
Druid needs to read Kafka messages that are no longer available (i.e. when 
`OffsetOutOfRangeException` is encountered).<br/><br/>If false, the exception 
will bubble up, which will cause your tasks to fail and ingestion to halt. If 
this occurs, manual intervention is required to correct the situation; 
potentially using the [Reset Supervisor 
API](../../api-reference/supervisor-api.md). This mode is useful for pro [...]
 | `workerThreads`                   | Integer        | The number of threads 
that the supervisor uses to handle requests/responses for worker tasks, along 
with any other internal asynchronous operation.                                 
                                                                                
                                                                                
                                                                                
                  [...]
 | `chatAsync`                       | Boolean        | If true, use 
asynchronous communication with indexing tasks, and ignore the `chatThreads` 
parameter. If false, use synchronous communication in a thread pool of size 
`chatThreads`.                                                                  
                                                                                
                                                                                
                                [...]
diff --git a/docs/development/extensions-core/kinesis-ingestion.md 
b/docs/development/extensions-core/kinesis-ingestion.md
index 52abcba4b3..1b92110688 100644
--- a/docs/development/extensions-core/kinesis-ingestion.md
+++ b/docs/development/extensions-core/kinesis-ingestion.md
@@ -283,7 +283,7 @@ The `tuningConfig` is optional. If no `tuningConfig` is 
specified, default param
 |`indexSpec`|Object|Tune how data is indexed. See [IndexSpec](#indexspec) for 
more information.|no|
 |`indexSpecForIntermediatePersists`|Object|Defines segment storage format 
options to be used at indexing time for intermediate persisted temporary 
segments. This can be used to disable dimension/metric compression on 
intermediate segments to reduce memory required for final merging. However, 
disabling compression on intermediate segments might increase page cache use 
while they are used before getting merged into final segment published, see 
[IndexSpec](#indexspec) for possible values.|  [...]
 |`reportParseExceptions`|Boolean|If true, exceptions encountered during 
parsing will be thrown and will halt ingestion; if false, unparseable rows and 
fields will be skipped.|no (default == false)|
-|`handoffConditionTimeout`|Long| Milliseconds to wait for segment handoff. It 
must be >= 0, where 0 means to wait forever.| no (default == 0)|
+|`handoffConditionTimeout`|Long| Number of milliseconds to wait for segment 
handoff. Set to a value >= 0, where 0 means to wait indefinitely.| no (default 
== 900000 [15 minutes])|
 |`resetOffsetAutomatically`|Boolean|Controls behavior when Druid needs to read 
Kinesis messages that are no longer available.<br/><br/>If false, the exception 
bubbles up, causing tasks to fail and ingestion to halt. If this occurs, manual 
intervention is required to correct the situation, potentially using the [Reset 
Supervisor API](../../api-reference/supervisor-api.md). This mode is useful for 
production, since it highlights issues with ingestion.<br/><br/>If true, Druid 
automatically  [...]
 |`skipSequenceNumberAvailabilityCheck`|Boolean|Whether to enable checking if 
the current sequence number is still available in a particular Kinesis shard. 
If set to false, the indexing task will attempt to reset the current sequence 
number (or not), depending on the value of `resetOffsetAutomatically`.|no 
(default == false)|
 |`workerThreads`|Integer|The number of threads that the supervisor uses to 
handle requests/responses for worker tasks, along with any other internal 
asynchronous operation.|no (default == min(10, taskCount))|
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java
index f0b489ec59..1b8b22e2c6 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java
@@ -19,7 +19,6 @@
 
 package org.apache.druid.indexing.kafka;
 
-import com.fasterxml.jackson.databind.Module;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import nl.jqno.equalsverifier.EqualsVerifier;
 import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorTuningConfig;
@@ -35,6 +34,7 @@ import org.junit.Test;
 
 import java.io.File;
 import java.io.IOException;
+import java.time.Duration;
 
 public class KafkaIndexTaskTuningConfigTest
 {
@@ -43,7 +43,7 @@ public class KafkaIndexTaskTuningConfigTest
   public KafkaIndexTaskTuningConfigTest()
   {
     mapper = new DefaultObjectMapper();
-    mapper.registerModules((Iterable<Module>) new 
KafkaIndexTaskModule().getJacksonModules());
+    mapper.registerModules(new KafkaIndexTaskModule().getJacksonModules());
   }
 
   @Test
@@ -71,7 +71,7 @@ public class KafkaIndexTaskTuningConfigTest
     Assert.assertEquals(IndexSpec.DEFAULT, config.getIndexSpec());
     Assert.assertEquals(IndexSpec.DEFAULT, 
config.getIndexSpecForIntermediatePersists());
     Assert.assertEquals(false, config.isReportParseExceptions());
-    Assert.assertEquals(0, config.getHandoffConditionTimeout());
+    Assert.assertEquals(Duration.ofMinutes(15).toMillis(), 
config.getHandoffConditionTimeout());
   }
 
   @Test
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java
index d9d572220b..e73315fbee 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java
@@ -19,7 +19,6 @@
 
 package org.apache.druid.indexing.kafka.supervisor;
 
-import com.fasterxml.jackson.databind.Module;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.druid.indexing.kafka.KafkaIndexTaskModule;
 import org.apache.druid.jackson.DefaultObjectMapper;
@@ -39,7 +38,7 @@ public class KafkaSupervisorTuningConfigTest
   public KafkaSupervisorTuningConfigTest()
   {
     mapper = new DefaultObjectMapper();
-    mapper.registerModules((Iterable<Module>) new 
KafkaIndexTaskModule().getJacksonModules());
+    mapper.registerModules(new KafkaIndexTaskModule().getJacksonModules());
   }
 
   @Test
@@ -66,7 +65,7 @@ public class KafkaSupervisorTuningConfigTest
     Assert.assertEquals(IndexSpec.DEFAULT, config.getIndexSpec());
     Assert.assertEquals(IndexSpec.DEFAULT, 
config.getIndexSpecForIntermediatePersists());
     Assert.assertEquals(false, config.isReportParseExceptions());
-    Assert.assertEquals(0, config.getHandoffConditionTimeout());
+    Assert.assertEquals(java.time.Duration.ofMinutes(15).toMillis(), 
config.getHandoffConditionTimeout());
     Assert.assertNull(config.getWorkerThreads());
     Assert.assertNull(config.getChatThreads());
     Assert.assertEquals(8L, (long) config.getChatRetries());
diff --git 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java
 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java
index ed497a64cf..6136a89942 100644
--- 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java
+++ 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java
@@ -20,7 +20,6 @@
 package org.apache.druid.indexing.kinesis;
 
 import com.fasterxml.jackson.databind.JsonMappingException;
-import com.fasterxml.jackson.databind.Module;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import nl.jqno.equalsverifier.EqualsVerifier;
 import 
org.apache.druid.indexing.kinesis.supervisor.KinesisSupervisorTuningConfig;
@@ -39,6 +38,7 @@ import org.junit.rules.ExpectedException;
 
 import java.io.File;
 import java.io.IOException;
+import java.time.Duration;
 
 public class KinesisIndexTaskTuningConfigTest
 {
@@ -47,7 +47,7 @@ public class KinesisIndexTaskTuningConfigTest
   public KinesisIndexTaskTuningConfigTest()
   {
     mapper = new DefaultObjectMapper();
-    mapper.registerModules((Iterable<Module>) new 
KinesisIndexingServiceModule().getJacksonModules());
+    mapper.registerModules(new 
KinesisIndexingServiceModule().getJacksonModules());
   }
 
   @Rule
@@ -76,7 +76,7 @@ public class KinesisIndexTaskTuningConfigTest
     Assert.assertEquals(0, config.getMaxPendingPersists());
     Assert.assertEquals(IndexSpec.DEFAULT, config.getIndexSpec());
     Assert.assertFalse(config.isReportParseExceptions());
-    Assert.assertEquals(0, config.getHandoffConditionTimeout());
+    Assert.assertEquals(Duration.ofMinutes(15).toMillis(), 
config.getHandoffConditionTimeout());
     Assert.assertNull(config.getRecordBufferSizeConfigured());
     Assert.assertEquals(10000, 
config.getRecordBufferSizeOrDefault(1_000_000_000, false));
     Assert.assertEquals(5000, config.getRecordBufferOfferTimeout());
diff --git 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfigTest.java
 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfigTest.java
index 320a978f12..140c7094f0 100644
--- 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfigTest.java
+++ 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfigTest.java
@@ -19,7 +19,6 @@
 
 package org.apache.druid.indexing.kinesis.supervisor;
 
-import com.fasterxml.jackson.databind.Module;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.druid.indexing.kinesis.KinesisIndexingServiceModule;
 import org.apache.druid.jackson.DefaultObjectMapper;
@@ -38,7 +37,7 @@ public class KinesisSupervisorTuningConfigTest
   public KinesisSupervisorTuningConfigTest()
   {
     mapper = new DefaultObjectMapper();
-    mapper.registerModules((Iterable<Module>) new 
KinesisIndexingServiceModule().getJacksonModules());
+    mapper.registerModules(new 
KinesisIndexingServiceModule().getJacksonModules());
   }
 
   @Test
@@ -64,7 +63,7 @@ public class KinesisSupervisorTuningConfigTest
     Assert.assertEquals(0, config.getMaxPendingPersists());
     Assert.assertEquals(IndexSpec.DEFAULT, config.getIndexSpec());
     Assert.assertEquals(false, config.isReportParseExceptions());
-    Assert.assertEquals(0, config.getHandoffConditionTimeout());
+    Assert.assertEquals(java.time.Duration.ofMinutes(15).toMillis(), 
config.getHandoffConditionTimeout());
     Assert.assertNull(config.getWorkerThreads());
     Assert.assertNull(config.getChatThreads());
     Assert.assertEquals(8L, (long) config.getChatRetries());
diff --git 
a/server/src/main/java/org/apache/druid/segment/indexing/RealtimeTuningConfig.java
 
b/server/src/main/java/org/apache/druid/segment/indexing/RealtimeTuningConfig.java
index b4cda68681..32c95ebc6a 100644
--- 
a/server/src/main/java/org/apache/druid/segment/indexing/RealtimeTuningConfig.java
+++ 
b/server/src/main/java/org/apache/druid/segment/indexing/RealtimeTuningConfig.java
@@ -37,6 +37,7 @@ import org.joda.time.Period;
 
 import javax.annotation.Nullable;
 import java.io.File;
+import java.time.Duration;
 
 /**
  *
@@ -51,7 +52,7 @@ public class RealtimeTuningConfig implements 
AppenderatorConfig
   private static final ShardSpec DEFAULT_SHARD_SPEC = new NumberedShardSpec(0, 
1);
   private static final IndexSpec DEFAULT_INDEX_SPEC = IndexSpec.DEFAULT;
   private static final Boolean DEFAULT_REPORT_PARSE_EXCEPTIONS = Boolean.FALSE;
-  private static final long DEFAULT_HANDOFF_CONDITION_TIMEOUT = 0;
+  private static final long DEFAULT_HANDOFF_CONDITION_TIMEOUT = 
Duration.ofMinutes(15).toMillis();
   private static final long DEFAULT_ALERT_TIMEOUT = 0;
   private static final String DEFAULT_DEDUP_COLUMN = null;
 
diff --git 
a/server/src/test/java/org/apache/druid/segment/indexing/RealtimeTuningConfigTest.java
 
b/server/src/test/java/org/apache/druid/segment/indexing/RealtimeTuningConfigTest.java
index 535dd108fd..abc9571970 100644
--- 
a/server/src/test/java/org/apache/druid/segment/indexing/RealtimeTuningConfigTest.java
+++ 
b/server/src/test/java/org/apache/druid/segment/indexing/RealtimeTuningConfigTest.java
@@ -26,6 +26,7 @@ import org.apache.druid.segment.data.CompressionStrategy;
 import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
 import org.apache.druid.timeline.partition.NumberedShardSpec;
 import org.hamcrest.CoreMatchers;
+import org.joda.time.Duration;
 import org.joda.time.Period;
 import org.junit.Assert;
 import org.junit.Test;
@@ -82,7 +83,7 @@ public class RealtimeTuningConfigTest
     );
 
     Assert.assertEquals(new OnheapIncrementalIndex.Spec(), 
config.getAppendableIndexSpec());
-    Assert.assertEquals(0, config.getHandoffConditionTimeout());
+    Assert.assertEquals(Duration.standardMinutes(15).getMillis(), 
config.getHandoffConditionTimeout());
     Assert.assertEquals(0, config.getAlertTimeout());
     Assert.assertEquals(IndexSpec.DEFAULT, config.getIndexSpec());
     Assert.assertEquals(IndexSpec.DEFAULT, 
config.getIndexSpecForIntermediatePersists());


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to