This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 95ca43034f Change default handoffConditionTimeout to 15 minutes.
(#14539)
95ca43034f is described below
commit 95ca43034fe661556f56dee7c39b6d7b784a0a2e
Author: Gian Merlino <[email protected]>
AuthorDate: Thu Jul 13 13:17:14 2023 -0700
Change default handoffConditionTimeout to 15 minutes. (#14539)
* Change default handoffConditionTimeout to 15 minutes.
Most of the time, when handoff is taking this long, it's because something
is preventing Historicals from loading new data. In this case, we have
two choices:
1) Stop making progress on ingestion, wait for Historicals to load stuff,
and keep the waiting-for-handoff segments available on realtime tasks.
(handoffConditionTimeout = 0, the current default)
2) Continue making progress on ingestion, by exiting the realtime tasks
that were waiting for handoff. Once the Historicals get their act
together, the segments will be loaded, as they are still there on
deep storage. They will just not be continuously available.
(handoffConditionTimeout > 0)
I believe most users would prefer [2], because [1] risks ingestion falling
behind the stream, which causes many other problems. It can cause data loss
if the stream ages-out data before we have a chance to ingest it.
Due to the way tuningConfigs are serialized -- defaults are baked into the
serialized form that is written to the database -- this default change will
not change anyone's existing supervisors. It will take effect for newly
created supervisors.
* Fix tests.
* Update docs/development/extensions-core/kafka-supervisor-reference.md
Co-authored-by: Katya Macedo <[email protected]>
* Update docs/development/extensions-core/kinesis-ingestion.md
Co-authored-by: Katya Macedo <[email protected]>
---------
Co-authored-by: Katya Macedo <[email protected]>
---
docs/development/extensions-core/kafka-supervisor-reference.md | 2 +-
docs/development/extensions-core/kinesis-ingestion.md | 2 +-
.../apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java | 6 +++---
.../indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java | 5 ++---
.../druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java | 6 +++---
.../kinesis/supervisor/KinesisSupervisorTuningConfigTest.java | 5 ++---
.../org/apache/druid/segment/indexing/RealtimeTuningConfig.java | 3 ++-
.../org/apache/druid/segment/indexing/RealtimeTuningConfigTest.java | 3 ++-
8 files changed, 16 insertions(+), 16 deletions(-)
diff --git a/docs/development/extensions-core/kafka-supervisor-reference.md
b/docs/development/extensions-core/kafka-supervisor-reference.md
index eff12d9185..ebde49e3da 100644
--- a/docs/development/extensions-core/kafka-supervisor-reference.md
+++ b/docs/development/extensions-core/kafka-supervisor-reference.md
@@ -204,7 +204,7 @@ The `tuningConfig` is optional and default parameters will
be used if no `tuning
| `indexSpec` | Object | Tune how data is
indexed. See [IndexSpec](#indexspec) for more information.
[...]
| `indexSpecForIntermediatePersists`| | Defines segment storage
format options to be used at indexing time for intermediate persisted temporary
segments. This can be used to disable dimension/metric compression on
intermediate segments to reduce memory required for final merging. However,
disabling compression on intermediate segments might increase page cache use
while they are used before getting merged into final segment published, see
[IndexSpec](#indexspec) for possib [...]
| `reportParseExceptions` | Boolean | *DEPRECATED*. If true,
exceptions encountered during parsing will be thrown and will halt ingestion;
if false, unparseable rows and fields will be skipped. Setting
`reportParseExceptions` to true will override existing configurations for
`maxParseExceptions` and `maxSavedParseExceptions`, setting
`maxParseExceptions` to 0 and limiting `maxSavedParseExceptions` to no more
than 1. [...]
-| `handoffConditionTimeout` | Long | Milliseconds to wait
for segment handoff. It must be >= 0, where 0 means to wait forever.
[...]
+| `handoffConditionTimeout` | Long | Number of milliseconds
to wait for segment handoff. Set to a value >= 0, where 0 means to wait
indefinitely.
[...]
| `resetOffsetAutomatically` | Boolean | Controls behavior when
Druid needs to read Kafka messages that are no longer available (i.e. when
`OffsetOutOfRangeException` is encountered).<br/><br/>If false, the exception
will bubble up, which will cause your tasks to fail and ingestion to halt. If
this occurs, manual intervention is required to correct the situation;
potentially using the [Reset Supervisor
API](../../api-reference/supervisor-api.md). This mode is useful for pro [...]
| `workerThreads` | Integer | The number of threads
that the supervisor uses to handle requests/responses for worker tasks, along
with any other internal asynchronous operation.
[...]
| `chatAsync` | Boolean | If true, use
asynchronous communication with indexing tasks, and ignore the `chatThreads`
parameter. If false, use synchronous communication in a thread pool of size
`chatThreads`.
[...]
diff --git a/docs/development/extensions-core/kinesis-ingestion.md
b/docs/development/extensions-core/kinesis-ingestion.md
index 52abcba4b3..1b92110688 100644
--- a/docs/development/extensions-core/kinesis-ingestion.md
+++ b/docs/development/extensions-core/kinesis-ingestion.md
@@ -283,7 +283,7 @@ The `tuningConfig` is optional. If no `tuningConfig` is
specified, default param
|`indexSpec`|Object|Tune how data is indexed. See [IndexSpec](#indexspec) for
more information.|no|
|`indexSpecForIntermediatePersists`|Object|Defines segment storage format
options to be used at indexing time for intermediate persisted temporary
segments. This can be used to disable dimension/metric compression on
intermediate segments to reduce memory required for final merging. However,
disabling compression on intermediate segments might increase page cache use
while they are used before getting merged into final segment published, see
[IndexSpec](#indexspec) for possible values.| [...]
|`reportParseExceptions`|Boolean|If true, exceptions encountered during
parsing will be thrown and will halt ingestion; if false, unparseable rows and
fields will be skipped.|no (default == false)|
-|`handoffConditionTimeout`|Long| Milliseconds to wait for segment handoff. It
must be >= 0, where 0 means to wait forever.| no (default == 0)|
+|`handoffConditionTimeout`|Long| Number of milliseconds to wait for segment
handoff. Set to a value >= 0, where 0 means to wait indefinitely.| no (default
== 900000 [15 minutes])|
|`resetOffsetAutomatically`|Boolean|Controls behavior when Druid needs to read
Kinesis messages that are no longer available.<br/><br/>If false, the exception
bubbles up, causing tasks to fail and ingestion to halt. If this occurs, manual
intervention is required to correct the situation, potentially using the [Reset
Supervisor API](../../api-reference/supervisor-api.md). This mode is useful for
production, since it highlights issues with ingestion.<br/><br/>If true, Druid
automatically [...]
|`skipSequenceNumberAvailabilityCheck`|Boolean|Whether to enable checking if
the current sequence number is still available in a particular Kinesis shard.
If set to false, the indexing task will attempt to reset the current sequence
number (or not), depending on the value of `resetOffsetAutomatically`.|no
(default == false)|
|`workerThreads`|Integer|The number of threads that the supervisor uses to
handle requests/responses for worker tasks, along with any other internal
asynchronous operation.|no (default == min(10, taskCount))|
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java
index f0b489ec59..1b8b22e2c6 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java
@@ -19,7 +19,6 @@
package org.apache.druid.indexing.kafka;
-import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorTuningConfig;
@@ -35,6 +34,7 @@ import org.junit.Test;
import java.io.File;
import java.io.IOException;
+import java.time.Duration;
public class KafkaIndexTaskTuningConfigTest
{
@@ -43,7 +43,7 @@ public class KafkaIndexTaskTuningConfigTest
public KafkaIndexTaskTuningConfigTest()
{
mapper = new DefaultObjectMapper();
- mapper.registerModules((Iterable<Module>) new
KafkaIndexTaskModule().getJacksonModules());
+ mapper.registerModules(new KafkaIndexTaskModule().getJacksonModules());
}
@Test
@@ -71,7 +71,7 @@ public class KafkaIndexTaskTuningConfigTest
Assert.assertEquals(IndexSpec.DEFAULT, config.getIndexSpec());
Assert.assertEquals(IndexSpec.DEFAULT,
config.getIndexSpecForIntermediatePersists());
Assert.assertEquals(false, config.isReportParseExceptions());
- Assert.assertEquals(0, config.getHandoffConditionTimeout());
+ Assert.assertEquals(Duration.ofMinutes(15).toMillis(),
config.getHandoffConditionTimeout());
}
@Test
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java
index d9d572220b..e73315fbee 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java
@@ -19,7 +19,6 @@
package org.apache.druid.indexing.kafka.supervisor;
-import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.indexing.kafka.KafkaIndexTaskModule;
import org.apache.druid.jackson.DefaultObjectMapper;
@@ -39,7 +38,7 @@ public class KafkaSupervisorTuningConfigTest
public KafkaSupervisorTuningConfigTest()
{
mapper = new DefaultObjectMapper();
- mapper.registerModules((Iterable<Module>) new
KafkaIndexTaskModule().getJacksonModules());
+ mapper.registerModules(new KafkaIndexTaskModule().getJacksonModules());
}
@Test
@@ -66,7 +65,7 @@ public class KafkaSupervisorTuningConfigTest
Assert.assertEquals(IndexSpec.DEFAULT, config.getIndexSpec());
Assert.assertEquals(IndexSpec.DEFAULT,
config.getIndexSpecForIntermediatePersists());
Assert.assertEquals(false, config.isReportParseExceptions());
- Assert.assertEquals(0, config.getHandoffConditionTimeout());
+ Assert.assertEquals(java.time.Duration.ofMinutes(15).toMillis(),
config.getHandoffConditionTimeout());
Assert.assertNull(config.getWorkerThreads());
Assert.assertNull(config.getChatThreads());
Assert.assertEquals(8L, (long) config.getChatRetries());
diff --git
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java
index ed497a64cf..6136a89942 100644
---
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java
+++
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java
@@ -20,7 +20,6 @@
package org.apache.druid.indexing.kinesis;
import com.fasterxml.jackson.databind.JsonMappingException;
-import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import nl.jqno.equalsverifier.EqualsVerifier;
import
org.apache.druid.indexing.kinesis.supervisor.KinesisSupervisorTuningConfig;
@@ -39,6 +38,7 @@ import org.junit.rules.ExpectedException;
import java.io.File;
import java.io.IOException;
+import java.time.Duration;
public class KinesisIndexTaskTuningConfigTest
{
@@ -47,7 +47,7 @@ public class KinesisIndexTaskTuningConfigTest
public KinesisIndexTaskTuningConfigTest()
{
mapper = new DefaultObjectMapper();
- mapper.registerModules((Iterable<Module>) new
KinesisIndexingServiceModule().getJacksonModules());
+ mapper.registerModules(new
KinesisIndexingServiceModule().getJacksonModules());
}
@Rule
@@ -76,7 +76,7 @@ public class KinesisIndexTaskTuningConfigTest
Assert.assertEquals(0, config.getMaxPendingPersists());
Assert.assertEquals(IndexSpec.DEFAULT, config.getIndexSpec());
Assert.assertFalse(config.isReportParseExceptions());
- Assert.assertEquals(0, config.getHandoffConditionTimeout());
+ Assert.assertEquals(Duration.ofMinutes(15).toMillis(),
config.getHandoffConditionTimeout());
Assert.assertNull(config.getRecordBufferSizeConfigured());
Assert.assertEquals(10000,
config.getRecordBufferSizeOrDefault(1_000_000_000, false));
Assert.assertEquals(5000, config.getRecordBufferOfferTimeout());
diff --git
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfigTest.java
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfigTest.java
index 320a978f12..140c7094f0 100644
---
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfigTest.java
+++
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfigTest.java
@@ -19,7 +19,6 @@
package org.apache.druid.indexing.kinesis.supervisor;
-import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.druid.indexing.kinesis.KinesisIndexingServiceModule;
import org.apache.druid.jackson.DefaultObjectMapper;
@@ -38,7 +37,7 @@ public class KinesisSupervisorTuningConfigTest
public KinesisSupervisorTuningConfigTest()
{
mapper = new DefaultObjectMapper();
- mapper.registerModules((Iterable<Module>) new
KinesisIndexingServiceModule().getJacksonModules());
+ mapper.registerModules(new
KinesisIndexingServiceModule().getJacksonModules());
}
@Test
@@ -64,7 +63,7 @@ public class KinesisSupervisorTuningConfigTest
Assert.assertEquals(0, config.getMaxPendingPersists());
Assert.assertEquals(IndexSpec.DEFAULT, config.getIndexSpec());
Assert.assertEquals(false, config.isReportParseExceptions());
- Assert.assertEquals(0, config.getHandoffConditionTimeout());
+ Assert.assertEquals(java.time.Duration.ofMinutes(15).toMillis(),
config.getHandoffConditionTimeout());
Assert.assertNull(config.getWorkerThreads());
Assert.assertNull(config.getChatThreads());
Assert.assertEquals(8L, (long) config.getChatRetries());
diff --git
a/server/src/main/java/org/apache/druid/segment/indexing/RealtimeTuningConfig.java
b/server/src/main/java/org/apache/druid/segment/indexing/RealtimeTuningConfig.java
index b4cda68681..32c95ebc6a 100644
---
a/server/src/main/java/org/apache/druid/segment/indexing/RealtimeTuningConfig.java
+++
b/server/src/main/java/org/apache/druid/segment/indexing/RealtimeTuningConfig.java
@@ -37,6 +37,7 @@ import org.joda.time.Period;
import javax.annotation.Nullable;
import java.io.File;
+import java.time.Duration;
/**
*
@@ -51,7 +52,7 @@ public class RealtimeTuningConfig implements
AppenderatorConfig
private static final ShardSpec DEFAULT_SHARD_SPEC = new NumberedShardSpec(0,
1);
private static final IndexSpec DEFAULT_INDEX_SPEC = IndexSpec.DEFAULT;
private static final Boolean DEFAULT_REPORT_PARSE_EXCEPTIONS = Boolean.FALSE;
- private static final long DEFAULT_HANDOFF_CONDITION_TIMEOUT = 0;
+ private static final long DEFAULT_HANDOFF_CONDITION_TIMEOUT =
Duration.ofMinutes(15).toMillis();
private static final long DEFAULT_ALERT_TIMEOUT = 0;
private static final String DEFAULT_DEDUP_COLUMN = null;
diff --git
a/server/src/test/java/org/apache/druid/segment/indexing/RealtimeTuningConfigTest.java
b/server/src/test/java/org/apache/druid/segment/indexing/RealtimeTuningConfigTest.java
index 535dd108fd..abc9571970 100644
---
a/server/src/test/java/org/apache/druid/segment/indexing/RealtimeTuningConfigTest.java
+++
b/server/src/test/java/org/apache/druid/segment/indexing/RealtimeTuningConfigTest.java
@@ -26,6 +26,7 @@ import org.apache.druid.segment.data.CompressionStrategy;
import org.apache.druid.segment.incremental.OnheapIncrementalIndex;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.hamcrest.CoreMatchers;
+import org.joda.time.Duration;
import org.joda.time.Period;
import org.junit.Assert;
import org.junit.Test;
@@ -82,7 +83,7 @@ public class RealtimeTuningConfigTest
);
Assert.assertEquals(new OnheapIncrementalIndex.Spec(),
config.getAppendableIndexSpec());
- Assert.assertEquals(0, config.getHandoffConditionTimeout());
+ Assert.assertEquals(Duration.standardMinutes(15).getMillis(),
config.getHandoffConditionTimeout());
Assert.assertEquals(0, config.getAlertTimeout());
Assert.assertEquals(IndexSpec.DEFAULT, config.getIndexSpec());
Assert.assertEquals(IndexSpec.DEFAULT,
config.getIndexSpecForIntermediatePersists());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]