This is an automated email from the ASF dual-hosted git repository.
gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new d98c808d3f Remove basePersistDirectory from tuning configs. (#13040)
d98c808d3f is described below
commit d98c808d3f087645d8dcfd516e77b0058bdfd389
Author: Gian Merlino <[email protected]>
AuthorDate: Fri Oct 21 17:25:36 2022 -0700
Remove basePersistDirectory from tuning configs. (#13040)
* Remove basePersistDirectory from tuning configs.
Since the removal of CliRealtime, it serves no purpose, since it is
always overridden in production using withBasePersistDirectory given
some subdirectory of the task work directory.
Removing this from the tuning config has a benefit beyond removing
no-longer-needed logic: it also avoids the side effect of empty
"druid-realtime-persist" directories getting created in the systemwide
temp directory.
* Test adjustments to appropriately set basePersistDirectory.
* Remove unused import.
* Fix RATC constructor.
---
.../indexing/kafka/KafkaIndexTaskTuningConfig.java | 55 ++++++-
.../supervisor/KafkaSupervisorTuningConfig.java | 8 +-
.../kafka/KafkaIndexTaskTuningConfigTest.java | 12 +-
.../kafka/supervisor/KafkaSupervisorTest.java | 6 -
.../KafkaSupervisorTuningConfigTest.java | 6 +-
.../TestModifiedKafkaIndexTaskTuningConfig.java | 4 +-
.../kinesis/KinesisIndexTaskTuningConfig.java | 93 +++++++++---
.../supervisor/KinesisSupervisorTuningConfig.java | 8 +-
.../kinesis/KinesisIndexTaskTuningConfigTest.java | 11 +-
.../kinesis/supervisor/KinesisSupervisorTest.java | 4 -
.../KinesisSupervisorTuningConfigTest.java | 6 +-
.../TestModifiedKinesisIndexTaskTuningConfig.java | 4 +-
.../index/RealtimeAppenderatorTuningConfig.java | 95 ++++++++----
.../SeekableStreamIndexTaskTuningConfig.java | 3 +-
.../segment/indexing/RealtimeTuningConfig.java | 105 ++++++++-----
.../segment/indexing/RealtimeTuningConfigTest.java | 13 +-
.../appenderator/AppenderatorPlumberTest.java | 24 ++-
.../appenderator/StreamAppenderatorDriverTest.java | 13 +-
.../appenderator/StreamAppenderatorTest.java | 165 +++++++++++++++------
.../appenderator/StreamAppenderatorTester.java | 115 ++++++++------
.../plumber/RealtimePlumberSchoolTest.java | 7 +-
21 files changed, 500 insertions(+), 257 deletions(-)
diff --git
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfig.java
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfig.java
index c0dfb334e8..be3aac9ac2 100644
---
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfig.java
+++
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfig.java
@@ -32,8 +32,54 @@ import java.io.File;
public class KafkaIndexTaskTuningConfig extends
SeekableStreamIndexTaskTuningConfig
{
- @JsonCreator
public KafkaIndexTaskTuningConfig(
+ @Nullable AppendableIndexSpec appendableIndexSpec,
+ @Nullable Integer maxRowsInMemory,
+ @Nullable Long maxBytesInMemory,
+ @Nullable Boolean skipBytesInMemoryOverheadCheck,
+ @Nullable Integer maxRowsPerSegment,
+ @Nullable Long maxTotalRows,
+ @Nullable Period intermediatePersistPeriod,
+ @Nullable File basePersistDirectory,
+ @Nullable Integer maxPendingPersists,
+ @Nullable IndexSpec indexSpec,
+ @Nullable IndexSpec indexSpecForIntermediatePersists,
+ @Nullable Boolean reportParseExceptions,
+ @Nullable Long handoffConditionTimeout,
+ @Nullable Boolean resetOffsetAutomatically,
+ @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
+ @Nullable Period intermediateHandoffPeriod,
+ @Nullable Boolean logParseExceptions,
+ @Nullable Integer maxParseExceptions,
+ @Nullable Integer maxSavedParseExceptions
+ )
+ {
+ super(
+ appendableIndexSpec,
+ maxRowsInMemory,
+ maxBytesInMemory,
+ skipBytesInMemoryOverheadCheck,
+ maxRowsPerSegment,
+ maxTotalRows,
+ intermediatePersistPeriod,
+ basePersistDirectory,
+ maxPendingPersists,
+ indexSpec,
+ indexSpecForIntermediatePersists,
+ reportParseExceptions,
+ handoffConditionTimeout,
+ resetOffsetAutomatically,
+ false,
+ segmentWriteOutMediumFactory,
+ intermediateHandoffPeriod,
+ logParseExceptions,
+ maxParseExceptions,
+ maxSavedParseExceptions
+ );
+ }
+
+ @JsonCreator
+ private KafkaIndexTaskTuningConfig(
@JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec
appendableIndexSpec,
@JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory,
@JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory,
@@ -41,7 +87,6 @@ public class KafkaIndexTaskTuningConfig extends
SeekableStreamIndexTaskTuningCon
@JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment,
@JsonProperty("maxTotalRows") @Nullable Long maxTotalRows,
@JsonProperty("intermediatePersistPeriod") @Nullable Period
intermediatePersistPeriod,
- @JsonProperty("basePersistDirectory") @Nullable File
basePersistDirectory,
@JsonProperty("maxPendingPersists") @Nullable Integer maxPendingPersists,
@JsonProperty("indexSpec") @Nullable IndexSpec indexSpec,
@JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec
indexSpecForIntermediatePersists,
@@ -55,7 +100,7 @@ public class KafkaIndexTaskTuningConfig extends
SeekableStreamIndexTaskTuningCon
@JsonProperty("maxSavedParseExceptions") @Nullable Integer
maxSavedParseExceptions
)
{
- super(
+ this(
appendableIndexSpec,
maxRowsInMemory,
maxBytesInMemory,
@@ -63,14 +108,13 @@ public class KafkaIndexTaskTuningConfig extends
SeekableStreamIndexTaskTuningCon
maxRowsPerSegment,
maxTotalRows,
intermediatePersistPeriod,
- basePersistDirectory,
+ null,
maxPendingPersists,
indexSpec,
indexSpecForIntermediatePersists,
reportParseExceptions,
handoffConditionTimeout,
resetOffsetAutomatically,
- false,
segmentWriteOutMediumFactory,
intermediateHandoffPeriod,
logParseExceptions,
@@ -116,7 +160,6 @@ public class KafkaIndexTaskTuningConfig extends
SeekableStreamIndexTaskTuningCon
", maxBytesInMemory=" + getMaxBytesInMemory() +
", skipBytesInMemoryOverheadCheck=" +
isSkipBytesInMemoryOverheadCheck() +
", intermediatePersistPeriod=" + getIntermediatePersistPeriod() +
- ", basePersistDirectory=" + getBasePersistDirectory() +
", maxPendingPersists=" + getMaxPendingPersists() +
", indexSpec=" + getIndexSpec() +
", indexSpecForIntermediatePersists=" +
getIndexSpecForIntermediatePersists() +
diff --git
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java
index 4fc5489a8d..596e532924 100644
---
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java
+++
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java
@@ -29,7 +29,6 @@ import org.joda.time.Duration;
import org.joda.time.Period;
import javax.annotation.Nullable;
-import java.io.File;
public class KafkaSupervisorTuningConfig extends KafkaIndexTaskTuningConfig
implements SeekableStreamSupervisorTuningConfig
@@ -67,7 +66,6 @@ public class KafkaSupervisorTuningConfig extends
KafkaIndexTaskTuningConfig
null,
null,
null,
- null,
null
);
}
@@ -80,7 +78,6 @@ public class KafkaSupervisorTuningConfig extends
KafkaIndexTaskTuningConfig
@JsonProperty("maxRowsPerSegment") Integer maxRowsPerSegment,
@JsonProperty("maxTotalRows") Long maxTotalRows,
@JsonProperty("intermediatePersistPeriod") Period
intermediatePersistPeriod,
- @JsonProperty("basePersistDirectory") File basePersistDirectory,
@JsonProperty("maxPendingPersists") Integer maxPendingPersists,
@JsonProperty("indexSpec") IndexSpec indexSpec,
@JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec
indexSpecForIntermediatePersists,
@@ -108,7 +105,7 @@ public class KafkaSupervisorTuningConfig extends
KafkaIndexTaskTuningConfig
maxRowsPerSegment,
maxTotalRows,
intermediatePersistPeriod,
- basePersistDirectory,
+ null,
maxPendingPersists,
indexSpec,
indexSpecForIntermediatePersists,
@@ -198,7 +195,6 @@ public class KafkaSupervisorTuningConfig extends
KafkaIndexTaskTuningConfig
", maxBytesInMemory=" + getMaxBytesInMemoryOrDefault() +
", skipBytesInMemoryOverheadCheck=" +
isSkipBytesInMemoryOverheadCheck() +
", intermediatePersistPeriod=" + getIntermediatePersistPeriod() +
- ", basePersistDirectory=" + getBasePersistDirectory() +
", maxPendingPersists=" + getMaxPendingPersists() +
", indexSpec=" + getIndexSpec() +
", reportParseExceptions=" + isReportParseExceptions() +
@@ -229,7 +225,7 @@ public class KafkaSupervisorTuningConfig extends
KafkaIndexTaskTuningConfig
getMaxRowsPerSegment(),
getMaxTotalRows(),
getIntermediatePersistPeriod(),
- getBasePersistDirectory(),
+ null,
getMaxPendingPersists(),
getIndexSpec(),
getIndexSpecForIntermediatePersists(),
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java
index 6674c36f17..2c5bad93cb 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java
@@ -61,7 +61,7 @@ public class KafkaIndexTaskTuningConfigTest
TuningConfig.class
);
- Assert.assertNotNull(config.getBasePersistDirectory());
+ Assert.assertNull(config.getBasePersistDirectory());
Assert.assertEquals(new OnheapIncrementalIndex.Spec(),
config.getAppendableIndexSpec());
Assert.assertEquals(1000000, config.getMaxRowsInMemory());
Assert.assertEquals(5_000_000, config.getMaxRowsPerSegment().intValue());
@@ -102,7 +102,7 @@ public class KafkaIndexTaskTuningConfigTest
TuningConfig.class
);
- Assert.assertEquals(new File("/tmp/xxx"),
config.getBasePersistDirectory());
+ Assert.assertNull(config.getBasePersistDirectory());
Assert.assertEquals(new OnheapIncrementalIndex.Spec(),
config.getAppendableIndexSpec());
Assert.assertEquals(100, config.getMaxRowsInMemory());
Assert.assertEquals(100, config.getMaxRowsPerSegment().intValue());
@@ -127,7 +127,6 @@ public class KafkaIndexTaskTuningConfigTest
2,
10L,
new Period("PT3S"),
- new File("/tmp/xxx"),
4,
new IndexSpec(),
new IndexSpec(),
@@ -146,7 +145,7 @@ public class KafkaIndexTaskTuningConfigTest
null,
null
);
- KafkaIndexTaskTuningConfig copy = (KafkaIndexTaskTuningConfig)
original.convertToTaskTuningConfig();
+ KafkaIndexTaskTuningConfig copy = original.convertToTaskTuningConfig();
Assert.assertEquals(original.getAppendableIndexSpec(),
copy.getAppendableIndexSpec());
Assert.assertEquals(1, copy.getMaxRowsInMemory());
@@ -154,7 +153,7 @@ public class KafkaIndexTaskTuningConfigTest
Assert.assertNotEquals(null, copy.getMaxTotalRows());
Assert.assertEquals(10L, copy.getMaxTotalRows().longValue());
Assert.assertEquals(new Period("PT3S"),
copy.getIntermediatePersistPeriod());
- Assert.assertEquals(new File("/tmp/xxx"), copy.getBasePersistDirectory());
+ Assert.assertNull(copy.getBasePersistDirectory());
Assert.assertEquals(4, copy.getMaxPendingPersists());
Assert.assertEquals(new IndexSpec(), copy.getIndexSpec());
Assert.assertEquals(true, copy.isReportParseExceptions());
@@ -197,7 +196,7 @@ public class KafkaIndexTaskTuningConfigTest
Assert.assertEquals(base.getMaxRowsPerSegment(),
deserialized.getMaxRowsPerSegment());
Assert.assertEquals(base.getMaxTotalRows(),
deserialized.getMaxTotalRows());
Assert.assertEquals(base.getIntermediatePersistPeriod(),
deserialized.getIntermediatePersistPeriod());
- Assert.assertEquals(base.getBasePersistDirectory(),
deserialized.getBasePersistDirectory());
+ Assert.assertNull(deserialized.getBasePersistDirectory());
Assert.assertEquals(base.getMaxPendingPersists(),
deserialized.getMaxPendingPersists());
Assert.assertEquals(base.getIndexSpec(), deserialized.getIndexSpec());
Assert.assertEquals(base.isReportParseExceptions(),
deserialized.isReportParseExceptions());
@@ -221,7 +220,6 @@ public class KafkaIndexTaskTuningConfigTest
2,
10L,
new Period("PT3S"),
- new File("/tmp/xxx"),
4,
new IndexSpec(),
new IndexSpec(),
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index c30795cdd1..78b010b3b3 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -115,7 +115,6 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
-import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
@@ -313,7 +312,6 @@ public class KafkaSupervisorTest extends EasyMockSupport
50000,
null,
new Period("P1Y"),
- new File("/test"),
null,
null,
null,
@@ -3771,7 +3769,6 @@ public class KafkaSupervisorTest extends EasyMockSupport
50000,
null,
new Period("P1Y"),
- new File("/test"),
null,
null,
null,
@@ -3811,7 +3808,6 @@ public class KafkaSupervisorTest extends EasyMockSupport
50000,
null,
new Period("P1Y"),
- new File("/test"),
null,
null,
null,
@@ -4138,7 +4134,6 @@ public class KafkaSupervisorTest extends EasyMockSupport
50000,
null,
new Period("P1Y"),
- new File("/test"),
null,
null,
null,
@@ -4252,7 +4247,6 @@ public class KafkaSupervisorTest extends EasyMockSupport
50000,
null,
new Period("P1Y"),
- new File("/test"),
null,
null,
null,
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java
index 7151eb0681..5ac97b12c8 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java
@@ -32,8 +32,6 @@ import org.joda.time.Period;
import org.junit.Assert;
import org.junit.Test;
-import java.io.File;
-
public class KafkaSupervisorTuningConfigTest
{
private final ObjectMapper mapper;
@@ -59,7 +57,7 @@ public class KafkaSupervisorTuningConfigTest
TuningConfig.class
);
- Assert.assertNotNull(config.getBasePersistDirectory());
+ Assert.assertNull(config.getBasePersistDirectory());
Assert.assertEquals(new OnheapIncrementalIndex.Spec(),
config.getAppendableIndexSpec());
Assert.assertEquals(1000000, config.getMaxRowsInMemory());
Assert.assertEquals(5_000_000, config.getMaxRowsPerSegment().intValue());
@@ -110,7 +108,7 @@ public class KafkaSupervisorTuningConfigTest
TuningConfig.class
);
- Assert.assertEquals(new File("/tmp/xxx"),
config.getBasePersistDirectory());
+ Assert.assertNull(config.getBasePersistDirectory());
Assert.assertEquals(new OnheapIncrementalIndex.Spec(),
config.getAppendableIndexSpec());
Assert.assertEquals(100, config.getMaxRowsInMemory());
Assert.assertEquals(100, config.getMaxRowsPerSegment().intValue());
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestModifiedKafkaIndexTaskTuningConfig.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestModifiedKafkaIndexTaskTuningConfig.java
index 642141c3c4..fd6cbb48b8 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestModifiedKafkaIndexTaskTuningConfig.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestModifiedKafkaIndexTaskTuningConfig.java
@@ -29,7 +29,6 @@ import
org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
import org.joda.time.Period;
import javax.annotation.Nullable;
-import java.io.File;
@JsonTypeName("KafkaTuningConfig")
public class TestModifiedKafkaIndexTaskTuningConfig extends
KafkaIndexTaskTuningConfig
@@ -45,7 +44,6 @@ public class TestModifiedKafkaIndexTaskTuningConfig extends
KafkaIndexTaskTuning
@JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment,
@JsonProperty("maxTotalRows") @Nullable Long maxTotalRows,
@JsonProperty("intermediatePersistPeriod") @Nullable Period
intermediatePersistPeriod,
- @JsonProperty("basePersistDirectory") @Nullable File
basePersistDirectory,
@JsonProperty("maxPendingPersists") @Nullable Integer maxPendingPersists,
@JsonProperty("indexSpec") @Nullable IndexSpec indexSpec,
@JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec
indexSpecForIntermediatePersists,
@@ -68,7 +66,7 @@ public class TestModifiedKafkaIndexTaskTuningConfig extends
KafkaIndexTaskTuning
maxRowsPerSegment,
maxTotalRows,
intermediatePersistPeriod,
- basePersistDirectory,
+ null,
maxPendingPersists,
indexSpec,
indexSpecForIntermediatePersists,
diff --git
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java
index 21c0510758..4e9dbafb7c 100644
---
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java
+++
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java
@@ -47,8 +47,72 @@ public class KinesisIndexTaskTuningConfig extends
SeekableStreamIndexTaskTuningC
private final Integer fetchThreads;
private final int maxRecordsPerPoll;
- @JsonCreator
public KinesisIndexTaskTuningConfig(
+ @Nullable AppendableIndexSpec appendableIndexSpec,
+ Integer maxRowsInMemory,
+ Long maxBytesInMemory,
+ @Nullable Boolean skipBytesInMemoryOverheadCheck,
+ Integer maxRowsPerSegment,
+ Long maxTotalRows,
+ Period intermediatePersistPeriod,
+ File basePersistDirectory,
+ Integer maxPendingPersists,
+ IndexSpec indexSpec,
+ @Nullable IndexSpec indexSpecForIntermediatePersists,
+ Boolean reportParseExceptions,
+ Long handoffConditionTimeout,
+ Boolean resetOffsetAutomatically,
+ Boolean skipSequenceNumberAvailabilityCheck,
+ Integer recordBufferSize,
+ Integer recordBufferOfferTimeout,
+ Integer recordBufferFullWait,
+ Integer fetchThreads,
+ @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
+ @Nullable Boolean logParseExceptions,
+ @Nullable Integer maxParseExceptions,
+ @Nullable Integer maxSavedParseExceptions,
+ @Nullable Integer maxRecordsPerPoll,
+ @Nullable Period intermediateHandoffPeriod
+ )
+ {
+ super(
+ appendableIndexSpec,
+ maxRowsInMemory,
+ maxBytesInMemory,
+ skipBytesInMemoryOverheadCheck,
+ maxRowsPerSegment,
+ maxTotalRows,
+ intermediatePersistPeriod,
+ basePersistDirectory,
+ maxPendingPersists,
+ indexSpec,
+ indexSpecForIntermediatePersists,
+ reportParseExceptions,
+ handoffConditionTimeout,
+ resetOffsetAutomatically,
+ skipSequenceNumberAvailabilityCheck,
+ segmentWriteOutMediumFactory,
+ intermediateHandoffPeriod,
+ logParseExceptions,
+ maxParseExceptions,
+ maxSavedParseExceptions
+ );
+ this.recordBufferSize = recordBufferSize == null ?
DEFAULT_RECORD_BUFFER_SIZE : recordBufferSize;
+ this.recordBufferOfferTimeout = recordBufferOfferTimeout == null
+ ? DEFAULT_RECORD_BUFFER_OFFER_TIMEOUT
+ : recordBufferOfferTimeout;
+ this.recordBufferFullWait = recordBufferFullWait == null ?
DEFAULT_RECORD_BUFFER_FULL_WAIT : recordBufferFullWait;
+ this.fetchThreads = fetchThreads; // we handle this being null later
+ this.maxRecordsPerPoll = maxRecordsPerPoll == null ?
DEFAULT_MAX_RECORDS_PER_POLL : maxRecordsPerPoll;
+
+ Preconditions.checkArgument(
+ !(super.isResetOffsetAutomatically() &&
super.isSkipSequenceNumberAvailabilityCheck()),
+ "resetOffsetAutomatically cannot be used if
skipSequenceNumberAvailabilityCheck=true"
+ );
+ }
+
+ @JsonCreator
+ private KinesisIndexTaskTuningConfig(
@JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec
appendableIndexSpec,
@JsonProperty("maxRowsInMemory") Integer maxRowsInMemory,
@JsonProperty("maxBytesInMemory") Long maxBytesInMemory,
@@ -56,7 +120,6 @@ public class KinesisIndexTaskTuningConfig extends
SeekableStreamIndexTaskTuningC
@JsonProperty("maxRowsPerSegment") Integer maxRowsPerSegment,
@JsonProperty("maxTotalRows") Long maxTotalRows,
@JsonProperty("intermediatePersistPeriod") Period
intermediatePersistPeriod,
- @JsonProperty("basePersistDirectory") File basePersistDirectory,
@JsonProperty("maxPendingPersists") Integer maxPendingPersists,
@JsonProperty("indexSpec") IndexSpec indexSpec,
@JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec
indexSpecForIntermediatePersists,
@@ -76,7 +139,7 @@ public class KinesisIndexTaskTuningConfig extends
SeekableStreamIndexTaskTuningC
@JsonProperty("intermediateHandoffPeriod") @Nullable Period
intermediateHandoffPeriod
)
{
- super(
+ this(
appendableIndexSpec,
maxRowsInMemory,
maxBytesInMemory,
@@ -84,7 +147,7 @@ public class KinesisIndexTaskTuningConfig extends
SeekableStreamIndexTaskTuningC
maxRowsPerSegment,
maxTotalRows,
intermediatePersistPeriod,
- basePersistDirectory,
+ null,
maxPendingPersists,
indexSpec,
indexSpecForIntermediatePersists,
@@ -92,23 +155,16 @@ public class KinesisIndexTaskTuningConfig extends
SeekableStreamIndexTaskTuningC
handoffConditionTimeout,
resetOffsetAutomatically,
skipSequenceNumberAvailabilityCheck,
+ recordBufferSize,
+ recordBufferOfferTimeout,
+ recordBufferFullWait,
+ fetchThreads,
segmentWriteOutMediumFactory,
- intermediateHandoffPeriod,
logParseExceptions,
maxParseExceptions,
- maxSavedParseExceptions
- );
- this.recordBufferSize = recordBufferSize == null ?
DEFAULT_RECORD_BUFFER_SIZE : recordBufferSize;
- this.recordBufferOfferTimeout = recordBufferOfferTimeout == null
- ? DEFAULT_RECORD_BUFFER_OFFER_TIMEOUT
- : recordBufferOfferTimeout;
- this.recordBufferFullWait = recordBufferFullWait == null ?
DEFAULT_RECORD_BUFFER_FULL_WAIT : recordBufferFullWait;
- this.fetchThreads = fetchThreads; // we handle this being null later
- this.maxRecordsPerPoll = maxRecordsPerPoll == null ?
DEFAULT_MAX_RECORDS_PER_POLL : maxRecordsPerPoll;
-
- Preconditions.checkArgument(
- !(super.isResetOffsetAutomatically() &&
super.isSkipSequenceNumberAvailabilityCheck()),
- "resetOffsetAutomatically cannot be used if
skipSequenceNumberAvailabilityCheck=true"
+ maxSavedParseExceptions,
+ maxRecordsPerPoll,
+ intermediateHandoffPeriod
);
}
@@ -217,7 +273,6 @@ public class KinesisIndexTaskTuningConfig extends
SeekableStreamIndexTaskTuningC
", maxRowsPerSegment=" + getMaxRowsPerSegment() +
", maxTotalRows=" + getMaxTotalRows() +
", intermediatePersistPeriod=" + getIntermediatePersistPeriod() +
- ", basePersistDirectory=" + getBasePersistDirectory() +
", maxPendingPersists=" + getMaxPendingPersists() +
", indexSpec=" + getIndexSpec() +
", reportParseExceptions=" + isReportParseExceptions() +
diff --git
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java
index 57bae9f2a8..ba8f4311f0 100644
---
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java
+++
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java
@@ -29,7 +29,6 @@ import org.joda.time.Duration;
import org.joda.time.Period;
import javax.annotation.Nullable;
-import java.io.File;
public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig
implements SeekableStreamSupervisorTuningConfig
@@ -79,7 +78,6 @@ public class KinesisSupervisorTuningConfig extends
KinesisIndexTaskTuningConfig
null,
null,
null,
- null,
null
);
}
@@ -92,7 +90,6 @@ public class KinesisSupervisorTuningConfig extends
KinesisIndexTaskTuningConfig
@JsonProperty("maxRowsPerSegment") Integer maxRowsPerSegment,
@JsonProperty("maxTotalRows") Long maxTotalRows,
@JsonProperty("intermediatePersistPeriod") Period
intermediatePersistPeriod,
- @JsonProperty("basePersistDirectory") File basePersistDirectory,
@JsonProperty("maxPendingPersists") Integer maxPendingPersists,
@JsonProperty("indexSpec") IndexSpec indexSpec,
@JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec
indexSpecForIntermediatePersists,
@@ -129,7 +126,7 @@ public class KinesisSupervisorTuningConfig extends
KinesisIndexTaskTuningConfig
maxRowsPerSegment,
maxTotalRows,
intermediatePersistPeriod,
- basePersistDirectory,
+ null,
maxPendingPersists,
indexSpec,
indexSpecForIntermediatePersists,
@@ -239,7 +236,6 @@ public class KinesisSupervisorTuningConfig extends
KinesisIndexTaskTuningConfig
", maxRowsPerSegment=" + getMaxRowsPerSegment() +
", maxTotalRows=" + getMaxTotalRows() +
", intermediatePersistPeriod=" + getIntermediatePersistPeriod() +
- ", basePersistDirectory=" + getBasePersistDirectory() +
", maxPendingPersists=" + getMaxPendingPersists() +
", indexSpec=" + getIndexSpec() +
", reportParseExceptions=" + isReportParseExceptions() +
@@ -278,7 +274,7 @@ public class KinesisSupervisorTuningConfig extends
KinesisIndexTaskTuningConfig
getMaxRowsPerSegment(),
getMaxTotalRows(),
getIntermediatePersistPeriod(),
- getBasePersistDirectory(),
+ null,
getMaxPendingPersists(),
getIndexSpec(),
getIndexSpecForIntermediatePersists(),
diff --git
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java
index 9aa1f6127b..61db57449d 100644
---
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java
+++
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java
@@ -67,7 +67,7 @@ public class KinesisIndexTaskTuningConfigTest
TuningConfig.class
);
- Assert.assertNotNull(config.getBasePersistDirectory());
+ Assert.assertNull(config.getBasePersistDirectory());
Assert.assertEquals(new OnheapIncrementalIndex.Spec(),
config.getAppendableIndexSpec());
Assert.assertEquals(1000000, config.getMaxRowsInMemory());
Assert.assertEquals(5_000_000, config.getMaxRowsPerSegment().intValue());
@@ -115,7 +115,7 @@ public class KinesisIndexTaskTuningConfigTest
TuningConfig.class
);
- Assert.assertEquals(new File("/tmp/xxx"),
config.getBasePersistDirectory());
+ Assert.assertNull(config.getBasePersistDirectory());
Assert.assertEquals(new OnheapIncrementalIndex.Spec(),
config.getAppendableIndexSpec());
Assert.assertEquals(100, config.getMaxRowsInMemory());
Assert.assertEquals(100, config.getMaxRowsPerSegment().intValue());
@@ -173,7 +173,7 @@ public class KinesisIndexTaskTuningConfigTest
Assert.assertEquals(base.getMaxRowsPerSegment(),
deserialized.getMaxRowsPerSegment());
Assert.assertEquals(base.getMaxTotalRows(),
deserialized.getMaxTotalRows());
Assert.assertEquals(base.getIntermediatePersistPeriod(),
deserialized.getIntermediatePersistPeriod());
- Assert.assertEquals(base.getBasePersistDirectory(),
deserialized.getBasePersistDirectory());
+ Assert.assertNull(deserialized.getBasePersistDirectory());
Assert.assertEquals(base.getMaxPendingPersists(),
deserialized.getMaxPendingPersists());
Assert.assertEquals(base.getIndexSpec(), deserialized.getIndexSpec());
Assert.assertEquals(base.isReportParseExceptions(),
deserialized.isReportParseExceptions());
@@ -231,7 +231,7 @@ public class KinesisIndexTaskTuningConfigTest
Assert.assertEquals(base.getMaxRowsPerSegment(),
deserialized.getMaxRowsPerSegment());
Assert.assertEquals(base.getMaxTotalRows(),
deserialized.getMaxTotalRows());
Assert.assertEquals(base.getIntermediatePersistPeriod(),
deserialized.getIntermediatePersistPeriod());
- Assert.assertEquals(base.getBasePersistDirectory(),
deserialized.getBasePersistDirectory());
+ Assert.assertNull(deserialized.getBasePersistDirectory());
Assert.assertEquals(base.getMaxPendingPersists(),
deserialized.getMaxPendingPersists());
Assert.assertEquals(base.getIndexSpec(), deserialized.getIndexSpec());
Assert.assertEquals(base.isReportParseExceptions(),
deserialized.isReportParseExceptions());
@@ -286,7 +286,6 @@ public class KinesisIndexTaskTuningConfigTest
2,
100L,
new Period("PT3S"),
- new File("/tmp/xxx"),
4,
new IndexSpec(),
new IndexSpec(),
@@ -322,7 +321,7 @@ public class KinesisIndexTaskTuningConfigTest
Assert.assertEquals(2, copy.getMaxRowsPerSegment().intValue());
Assert.assertEquals(100L, (long) copy.getMaxTotalRows());
Assert.assertEquals(new Period("PT3S"),
copy.getIntermediatePersistPeriod());
- Assert.assertEquals(new File("/tmp/xxx"), copy.getBasePersistDirectory());
+ Assert.assertNull(copy.getBasePersistDirectory());
Assert.assertEquals(4, copy.getMaxPendingPersists());
Assert.assertEquals(new IndexSpec(), copy.getIndexSpec());
Assert.assertTrue(copy.isReportParseExceptions());
diff --git
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
index 99d8d5223b..be836567eb 100644
---
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
+++
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
@@ -103,7 +103,6 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
-import java.io.File;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -185,7 +184,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
50000,
null,
new Period("P1Y"),
- new File("/test"),
null,
null,
null,
@@ -3948,7 +3946,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
50000,
null,
new Period("P1Y"),
- new File("/test"),
null,
null,
null,
@@ -5061,7 +5058,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
50000,
null,
new Period("P1Y"),
- new File("/test"),
null,
null,
null,
diff --git
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfigTest.java
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfigTest.java
index c043cbdea5..a5c48b35b6 100644
---
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfigTest.java
+++
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfigTest.java
@@ -31,8 +31,6 @@ import org.joda.time.Period;
import org.junit.Assert;
import org.junit.Test;
-import java.io.File;
-
public class KinesisSupervisorTuningConfigTest
{
private final ObjectMapper mapper;
@@ -58,7 +56,7 @@ public class KinesisSupervisorTuningConfigTest
TuningConfig.class
);
- Assert.assertNotNull(config.getBasePersistDirectory());
+ Assert.assertNull(config.getBasePersistDirectory());
Assert.assertEquals(new OnheapIncrementalIndex.Spec(),
config.getAppendableIndexSpec());
Assert.assertEquals(1000000, config.getMaxRowsInMemory());
Assert.assertEquals(5_000_000, config.getMaxRowsPerSegment().intValue());
@@ -106,7 +104,7 @@ public class KinesisSupervisorTuningConfigTest
TuningConfig.class
);
- Assert.assertEquals(new File("/tmp/xxx"),
config.getBasePersistDirectory());
+ Assert.assertNull(config.getBasePersistDirectory());
Assert.assertEquals(new OnheapIncrementalIndex.Spec(),
config.getAppendableIndexSpec());
Assert.assertEquals(100, config.getMaxRowsInMemory());
Assert.assertEquals(100, config.getMaxRowsPerSegment().intValue());
diff --git
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java
index c6ce08ab2a..2772cc5794 100644
---
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java
+++
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java
@@ -29,7 +29,6 @@ import
org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
import org.joda.time.Period;
import javax.annotation.Nullable;
-import java.io.File;
@JsonTypeName("KinesisTuningConfig")
public class TestModifiedKinesisIndexTaskTuningConfig extends
KinesisIndexTaskTuningConfig
@@ -45,7 +44,6 @@ public class TestModifiedKinesisIndexTaskTuningConfig extends
KinesisIndexTaskTu
@JsonProperty("maxRowsPerSegment") Integer maxRowsPerSegment,
@JsonProperty("maxTotalRows") Long maxTotalRows,
@JsonProperty("intermediatePersistPeriod") Period
intermediatePersistPeriod,
- @JsonProperty("basePersistDirectory") File basePersistDirectory,
@JsonProperty("maxPendingPersists") Integer maxPendingPersists,
@JsonProperty("indexSpec") IndexSpec indexSpec,
@JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec
indexSpecForIntermediatePersists,
@@ -74,7 +72,7 @@ public class TestModifiedKinesisIndexTaskTuningConfig extends
KinesisIndexTaskTu
maxRowsPerSegment,
maxTotalRows,
intermediatePersistPeriod,
- basePersistDirectory,
+ null,
maxPendingPersists,
indexSpec,
indexSpecForIntermediatePersists,
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java
index 172d8eb0f1..8b11eddb7c 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java
@@ -24,7 +24,6 @@ import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import com.google.common.base.Preconditions;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
-import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.incremental.AppendableIndexSpec;
import org.apache.druid.segment.indexing.TuningConfig;
@@ -48,11 +47,6 @@ public class RealtimeAppenderatorTuningConfig implements
AppenderatorConfig
private static final long DEFAULT_HANDOFF_CONDITION_TIMEOUT = 0;
private static final long DEFAULT_ALERT_TIMEOUT = 0;
- private static File createNewBasePersistDirectory()
- {
- return FileUtils.createTempDir("druid-realtime-persist");
- }
-
private final AppendableIndexSpec appendableIndexSpec;
private final int maxRowsInMemory;
private final long maxBytesInMemory;
@@ -74,27 +68,26 @@ public class RealtimeAppenderatorTuningConfig implements
AppenderatorConfig
private final int maxParseExceptions;
private final int maxSavedParseExceptions;
- @JsonCreator
public RealtimeAppenderatorTuningConfig(
- @JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec
appendableIndexSpec,
- @JsonProperty("maxRowsInMemory") Integer maxRowsInMemory,
- @JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory,
- @JsonProperty("skipBytesInMemoryOverheadCheck") @Nullable Boolean
skipBytesInMemoryOverheadCheck,
- @JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment,
- @JsonProperty("maxTotalRows") @Nullable Long maxTotalRows,
- @JsonProperty("intermediatePersistPeriod") Period
intermediatePersistPeriod,
- @JsonProperty("basePersistDirectory") File basePersistDirectory,
- @JsonProperty("maxPendingPersists") Integer maxPendingPersists,
- @JsonProperty("shardSpec") ShardSpec shardSpec,
- @JsonProperty("indexSpec") IndexSpec indexSpec,
- @JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec
indexSpecForIntermediatePersists,
- @JsonProperty("reportParseExceptions") Boolean reportParseExceptions,
- @JsonProperty("publishAndHandoffTimeout") Long publishAndHandoffTimeout,
- @JsonProperty("alertTimeout") Long alertTimeout,
- @JsonProperty("segmentWriteOutMediumFactory") @Nullable
SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
- @JsonProperty("logParseExceptions") @Nullable Boolean logParseExceptions,
- @JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions,
- @JsonProperty("maxSavedParseExceptions") @Nullable Integer
maxSavedParseExceptions
+ @Nullable AppendableIndexSpec appendableIndexSpec,
+ Integer maxRowsInMemory,
+ @Nullable Long maxBytesInMemory,
+ @Nullable Boolean skipBytesInMemoryOverheadCheck,
+ @Nullable Integer maxRowsPerSegment,
+ @Nullable Long maxTotalRows,
+ Period intermediatePersistPeriod,
+ File basePersistDirectory,
+ Integer maxPendingPersists,
+ ShardSpec shardSpec,
+ IndexSpec indexSpec,
+ @Nullable IndexSpec indexSpecForIntermediatePersists,
+ Boolean reportParseExceptions,
+ Long publishAndHandoffTimeout,
+ Long alertTimeout,
+ @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
+ @Nullable Boolean logParseExceptions,
+ @Nullable Integer maxParseExceptions,
+ @Nullable Integer maxSavedParseExceptions
)
{
this.appendableIndexSpec = appendableIndexSpec == null ?
DEFAULT_APPENDABLE_INDEX : appendableIndexSpec;
@@ -108,7 +101,7 @@ public class RealtimeAppenderatorTuningConfig implements
AppenderatorConfig
this.intermediatePersistPeriod = intermediatePersistPeriod == null
? DEFAULT_INTERMEDIATE_PERSIST_PERIOD
: intermediatePersistPeriod;
- this.basePersistDirectory = basePersistDirectory == null ?
createNewBasePersistDirectory() : basePersistDirectory;
+ this.basePersistDirectory = basePersistDirectory;
this.maxPendingPersists = maxPendingPersists == null ?
DEFAULT_MAX_PENDING_PERSISTS : maxPendingPersists;
this.shardSpec = shardSpec == null ? DEFAULT_SHARD_SPEC : shardSpec;
this.indexSpec = indexSpec == null ? DEFAULT_INDEX_SPEC : indexSpec;
@@ -142,6 +135,51 @@ public class RealtimeAppenderatorTuningConfig implements
AppenderatorConfig
: logParseExceptions;
}
+ @JsonCreator
+ private RealtimeAppenderatorTuningConfig(
+ @JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec
appendableIndexSpec,
+ @JsonProperty("maxRowsInMemory") Integer maxRowsInMemory,
+ @JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory,
+ @JsonProperty("skipBytesInMemoryOverheadCheck") @Nullable Boolean
skipBytesInMemoryOverheadCheck,
+ @JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment,
+ @JsonProperty("maxTotalRows") @Nullable Long maxTotalRows,
+ @JsonProperty("intermediatePersistPeriod") Period
intermediatePersistPeriod,
+ @JsonProperty("maxPendingPersists") Integer maxPendingPersists,
+ @JsonProperty("shardSpec") ShardSpec shardSpec,
+ @JsonProperty("indexSpec") IndexSpec indexSpec,
+ @JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec
indexSpecForIntermediatePersists,
+ @JsonProperty("reportParseExceptions") Boolean reportParseExceptions,
+ @JsonProperty("publishAndHandoffTimeout") Long publishAndHandoffTimeout,
+ @JsonProperty("alertTimeout") Long alertTimeout,
+ @JsonProperty("segmentWriteOutMediumFactory") @Nullable
SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
+ @JsonProperty("logParseExceptions") @Nullable Boolean logParseExceptions,
+ @JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions,
+ @JsonProperty("maxSavedParseExceptions") @Nullable Integer
maxSavedParseExceptions
+ )
+ {
+ this(
+ appendableIndexSpec,
+ maxRowsInMemory,
+ maxBytesInMemory,
+ skipBytesInMemoryOverheadCheck,
+ maxRowsPerSegment,
+ maxTotalRows,
+ intermediatePersistPeriod,
+ null,
+ maxPendingPersists,
+ shardSpec,
+ indexSpec,
+ indexSpecForIntermediatePersists,
+ reportParseExceptions,
+ publishAndHandoffTimeout,
+ alertTimeout,
+ segmentWriteOutMediumFactory,
+ logParseExceptions,
+ maxParseExceptions,
+ maxSavedParseExceptions
+ );
+ }
+
@Override
@JsonProperty
public AppendableIndexSpec getAppendableIndexSpec()
@@ -199,10 +237,9 @@ public class RealtimeAppenderatorTuningConfig implements
AppenderatorConfig
}
@Override
- @JsonProperty
public File getBasePersistDirectory()
{
- return basePersistDirectory;
+ return Preconditions.checkNotNull(basePersistDirectory,
"basePersistDirectory not set");
}
@Override
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java
index 2d3445f3a8..904f2820af 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java
@@ -98,7 +98,7 @@ public abstract class SeekableStreamIndexTaskTuningConfig
implements Appenderato
this.intermediatePersistPeriod = intermediatePersistPeriod == null
? defaults.getIntermediatePersistPeriod()
: intermediatePersistPeriod;
- this.basePersistDirectory = defaults.getBasePersistDirectory();
+ this.basePersistDirectory = basePersistDirectory;
this.maxPendingPersists = maxPendingPersists == null ? 0 :
maxPendingPersists;
this.indexSpec = indexSpec == null ? defaults.getIndexSpec() : indexSpec;
this.indexSpecForIntermediatePersists = indexSpecForIntermediatePersists
== null ?
@@ -193,7 +193,6 @@ public abstract class SeekableStreamIndexTaskTuningConfig
implements Appenderato
}
@Override
- @JsonProperty
public File getBasePersistDirectory()
{
return basePersistDirectory;
diff --git
a/server/src/main/java/org/apache/druid/segment/indexing/RealtimeTuningConfig.java
b/server/src/main/java/org/apache/druid/segment/indexing/RealtimeTuningConfig.java
index 6901fafd79..84061711b3 100644
---
a/server/src/main/java/org/apache/druid/segment/indexing/RealtimeTuningConfig.java
+++
b/server/src/main/java/org/apache/druid/segment/indexing/RealtimeTuningConfig.java
@@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import org.apache.druid.indexer.partitions.PartitionsSpec;
-import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.incremental.AppendableIndexSpec;
import org.apache.druid.segment.realtime.appenderator.AppenderatorConfig;
@@ -56,11 +55,6 @@ public class RealtimeTuningConfig implements
AppenderatorConfig
private static final long DEFAULT_ALERT_TIMEOUT = 0;
private static final String DEFAULT_DEDUP_COLUMN = null;
- private static File createNewBasePersistDirectory()
- {
- return FileUtils.createTempDir("druid-realtime-persist");
- }
-
// Might make sense for this to be a builder
public static RealtimeTuningConfig makeDefaultTuningConfig(final @Nullable
File basePersistDirectory)
{
@@ -71,7 +65,7 @@ public class RealtimeTuningConfig implements
AppenderatorConfig
DEFAULT_SKIP_BYTES_IN_MEMORY_OVERHEAD_CHECK,
DEFAULT_INTERMEDIATE_PERSIST_PERIOD,
DEFAULT_WINDOW_PERIOD,
- basePersistDirectory == null ? createNewBasePersistDirectory() :
basePersistDirectory,
+ basePersistDirectory,
DEFAULT_VERSIONING_POLICY,
DEFAULT_REJECTION_POLICY_FACTORY,
DEFAULT_MAX_PENDING_PERSISTS,
@@ -111,28 +105,27 @@ public class RealtimeTuningConfig implements
AppenderatorConfig
@Nullable
private final String dedupColumn;
- @JsonCreator
public RealtimeTuningConfig(
- @JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec
appendableIndexSpec,
- @JsonProperty("maxRowsInMemory") Integer maxRowsInMemory,
- @JsonProperty("maxBytesInMemory") Long maxBytesInMemory,
- @JsonProperty("skipBytesInMemoryOverheadCheck") @Nullable Boolean
skipBytesInMemoryOverheadCheck,
- @JsonProperty("intermediatePersistPeriod") Period
intermediatePersistPeriod,
- @JsonProperty("windowPeriod") Period windowPeriod,
- @JsonProperty("basePersistDirectory") File basePersistDirectory,
- @JsonProperty("versioningPolicy") VersioningPolicy versioningPolicy,
- @JsonProperty("rejectionPolicy") RejectionPolicyFactory
rejectionPolicyFactory,
- @JsonProperty("maxPendingPersists") Integer maxPendingPersists,
- @JsonProperty("shardSpec") ShardSpec shardSpec,
- @JsonProperty("indexSpec") IndexSpec indexSpec,
- @JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec
indexSpecForIntermediatePersists,
- @JsonProperty("persistThreadPriority") int persistThreadPriority,
- @JsonProperty("mergeThreadPriority") int mergeThreadPriority,
- @JsonProperty("reportParseExceptions") Boolean reportParseExceptions,
- @JsonProperty("handoffConditionTimeout") Long handoffConditionTimeout,
- @JsonProperty("alertTimeout") Long alertTimeout,
- @JsonProperty("segmentWriteOutMediumFactory") @Nullable
SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
- @JsonProperty("dedupColumn") @Nullable String dedupColumn
+ @Nullable AppendableIndexSpec appendableIndexSpec,
+ Integer maxRowsInMemory,
+ Long maxBytesInMemory,
+ @Nullable Boolean skipBytesInMemoryOverheadCheck,
+ Period intermediatePersistPeriod,
+ Period windowPeriod,
+ File basePersistDirectory,
+ VersioningPolicy versioningPolicy,
+ RejectionPolicyFactory rejectionPolicyFactory,
+ Integer maxPendingPersists,
+ ShardSpec shardSpec,
+ IndexSpec indexSpec,
+ @Nullable IndexSpec indexSpecForIntermediatePersists,
+ int persistThreadPriority,
+ int mergeThreadPriority,
+ Boolean reportParseExceptions,
+ Long handoffConditionTimeout,
+ Long alertTimeout,
+ @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
+ @Nullable String dedupColumn
)
{
this.appendableIndexSpec = appendableIndexSpec == null ?
DEFAULT_APPENDABLE_INDEX : appendableIndexSpec;
@@ -146,8 +139,8 @@ public class RealtimeTuningConfig implements
AppenderatorConfig
? DEFAULT_INTERMEDIATE_PERSIST_PERIOD
: intermediatePersistPeriod;
this.windowPeriod = windowPeriod == null ? DEFAULT_WINDOW_PERIOD :
windowPeriod;
- this.basePersistDirectory = basePersistDirectory == null ?
createNewBasePersistDirectory() : basePersistDirectory;
- this.versioningPolicy = versioningPolicy == null ?
DEFAULT_VERSIONING_POLICY : versioningPolicy;
+ this.basePersistDirectory = basePersistDirectory;
+ this.versioningPolicy = versioningPolicy;
this.rejectionPolicyFactory = rejectionPolicyFactory == null
? DEFAULT_REJECTION_POLICY_FACTORY
: rejectionPolicyFactory;
@@ -172,6 +165,52 @@ public class RealtimeTuningConfig implements
AppenderatorConfig
this.dedupColumn = dedupColumn == null ? DEFAULT_DEDUP_COLUMN :
dedupColumn;
}
+ @JsonCreator
+ private RealtimeTuningConfig(
+ @JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec
appendableIndexSpec,
+ @JsonProperty("maxRowsInMemory") Integer maxRowsInMemory,
+ @JsonProperty("maxBytesInMemory") Long maxBytesInMemory,
+ @JsonProperty("skipBytesInMemoryOverheadCheck") @Nullable Boolean
skipBytesInMemoryOverheadCheck,
+ @JsonProperty("intermediatePersistPeriod") Period
intermediatePersistPeriod,
+ @JsonProperty("windowPeriod") Period windowPeriod,
+ @JsonProperty("rejectionPolicy") RejectionPolicyFactory
rejectionPolicyFactory,
+ @JsonProperty("maxPendingPersists") Integer maxPendingPersists,
+ @JsonProperty("shardSpec") ShardSpec shardSpec,
+ @JsonProperty("indexSpec") IndexSpec indexSpec,
+ @JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec
indexSpecForIntermediatePersists,
+ @JsonProperty("persistThreadPriority") int persistThreadPriority,
+ @JsonProperty("mergeThreadPriority") int mergeThreadPriority,
+ @JsonProperty("reportParseExceptions") Boolean reportParseExceptions,
+ @JsonProperty("handoffConditionTimeout") Long handoffConditionTimeout,
+ @JsonProperty("alertTimeout") Long alertTimeout,
+ @JsonProperty("segmentWriteOutMediumFactory") @Nullable
SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
+ @JsonProperty("dedupColumn") @Nullable String dedupColumn
+ )
+ {
+ this(
+ appendableIndexSpec,
+ maxRowsInMemory,
+ maxBytesInMemory,
+ skipBytesInMemoryOverheadCheck,
+ intermediatePersistPeriod,
+ windowPeriod,
+ null,
+ null,
+ rejectionPolicyFactory,
+ maxPendingPersists,
+ shardSpec,
+ indexSpec,
+ indexSpecForIntermediatePersists,
+ persistThreadPriority,
+ mergeThreadPriority,
+ reportParseExceptions,
+ handoffConditionTimeout,
+ alertTimeout,
+ segmentWriteOutMediumFactory,
+ dedupColumn
+ );
+ }
+
@Override
@JsonProperty
public AppendableIndexSpec getAppendableIndexSpec()
@@ -214,16 +253,14 @@ public class RealtimeTuningConfig implements
AppenderatorConfig
}
@Override
- @JsonProperty
public File getBasePersistDirectory()
{
- return basePersistDirectory;
+ return Preconditions.checkNotNull(basePersistDirectory,
"basePersistDirectory not set");
}
- @JsonProperty
public VersioningPolicy getVersioningPolicy()
{
- return versioningPolicy;
+ return Preconditions.checkNotNull(versioningPolicy, "versioningPolicy not
set");
}
@JsonProperty("rejectionPolicy")
diff --git
a/server/src/test/java/org/apache/druid/segment/indexing/RealtimeTuningConfigTest.java
b/server/src/test/java/org/apache/druid/segment/indexing/RealtimeTuningConfigTest.java
index 72f0b8bbfe..c240cc4d35 100644
---
a/server/src/test/java/org/apache/druid/segment/indexing/RealtimeTuningConfigTest.java
+++
b/server/src/test/java/org/apache/druid/segment/indexing/RealtimeTuningConfigTest.java
@@ -35,14 +35,6 @@ import java.util.UUID;
public class RealtimeTuningConfigTest
{
- @Test
- public void testDefaultBasePersistDirectory()
- {
- final RealtimeTuningConfig tuningConfig1 =
RealtimeTuningConfig.makeDefaultTuningConfig(null);
- final RealtimeTuningConfig tuningConfig2 =
RealtimeTuningConfig.makeDefaultTuningConfig(null);
- Assert.assertNotEquals(tuningConfig1.getBasePersistDirectory(),
tuningConfig2.getBasePersistDirectory());
- }
-
@Test
public void
testErrorMessageIsMeaningfulWhenUnableToCreateTemporaryDirectory()
{
@@ -89,7 +81,6 @@ public class RealtimeTuningConfigTest
TuningConfig.class
);
- Assert.assertNotNull(config.getBasePersistDirectory());
Assert.assertEquals(new OnheapIncrementalIndex.Spec(),
config.getAppendableIndexSpec());
Assert.assertEquals(0, config.getHandoffConditionTimeout());
Assert.assertEquals(0, config.getAlertTimeout());
@@ -102,7 +93,7 @@ public class RealtimeTuningConfigTest
Assert.assertEquals(0, config.getMergeThreadPriority());
Assert.assertEquals(0, config.getPersistThreadPriority());
Assert.assertEquals(new Period("PT10M"), config.getWindowPeriod());
- Assert.assertEquals(false, config.isReportParseExceptions());
+ Assert.assertFalse(config.isReportParseExceptions());
}
@Test
@@ -113,7 +104,6 @@ public class RealtimeTuningConfigTest
+ " \"maxRowsInMemory\": 100,\n"
+ " \"intermediatePersistPeriod\": \"PT1H\",\n"
+ " \"windowPeriod\": \"PT1H\",\n"
- + " \"basePersistDirectory\": \"/tmp/xxx\",\n"
+ " \"maxPendingPersists\": 100,\n"
+ " \"persistThreadPriority\": 100,\n"
+ " \"mergeThreadPriority\": 100,\n"
@@ -136,7 +126,6 @@ public class RealtimeTuningConfigTest
TuningConfig.class
);
- Assert.assertEquals("/tmp/xxx",
config.getBasePersistDirectory().toString());
Assert.assertEquals(new OnheapIncrementalIndex.Spec(),
config.getAppendableIndexSpec());
Assert.assertEquals(100, config.getHandoffConditionTimeout());
Assert.assertEquals(70, config.getAlertTimeout());
diff --git
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java
index f2d793e051..dc96e912dc 100644
---
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java
+++
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java
@@ -29,16 +29,27 @@ import
org.apache.druid.segment.realtime.plumber.NoopRejectionPolicyFactory;
import org.apache.druid.server.coordination.DataSegmentAnnouncer;
import org.easymock.EasyMock;
import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
public class AppenderatorPlumberTest
{
- private final AppenderatorPlumber plumber;
- private final StreamAppenderatorTester streamAppenderatorTester;
+ private AppenderatorPlumber plumber;
+ private StreamAppenderatorTester streamAppenderatorTester;
- public AppenderatorPlumberTest() throws Exception
+ @Rule
+ public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+ @Before
+ public void setUp() throws Exception
{
- this.streamAppenderatorTester = new StreamAppenderatorTester(10);
+ this.streamAppenderatorTester =
+ new StreamAppenderatorTester.Builder()
+ .maxRowsInMemory(10)
+ .basePersistDirectory(temporaryFolder.newFolder())
+ .build();
DataSegmentAnnouncer segmentAnnouncer = EasyMock
.createMock(DataSegmentAnnouncer.class);
segmentAnnouncer.announceSegment(EasyMock.anyObject());
@@ -60,7 +71,7 @@ public class AppenderatorPlumberTest
EasyMock.anyObject(),
EasyMock.anyObject(),
EasyMock.anyObject())).andReturn(true).anyTimes();
-
+
RealtimeTuningConfig tuningConfig = new RealtimeTuningConfig(
null,
1,
@@ -68,7 +79,7 @@ public class AppenderatorPlumberTest
null,
null,
null,
- null,
+ temporaryFolder.newFolder(),
new IntervalStartVersioningPolicy(),
new NoopRejectionPolicyFactory(),
null,
@@ -88,7 +99,6 @@ public class AppenderatorPlumberTest
tuningConfig,
streamAppenderatorTester.getMetrics(),
segmentAnnouncer, segmentPublisher,
handoffNotifier,
streamAppenderatorTester.getAppenderator());
-
}
@Test
diff --git
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java
index b6154a6afe..476ecded7e 100644
---
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java
+++
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java
@@ -50,7 +50,9 @@ import org.joda.time.DateTime;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
import java.io.IOException;
import java.util.Arrays;
@@ -100,14 +102,21 @@ public class StreamAppenderatorDriverTest extends
EasyMockSupport
private StreamAppenderatorDriver driver;
private DataSegmentKiller dataSegmentKiller;
+ @Rule
+ public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
static {
NullHandling.initializeForTests();
}
@Before
- public void setUp()
+ public void setUp() throws Exception
{
- streamAppenderatorTester = new
StreamAppenderatorTester(MAX_ROWS_IN_MEMORY);
+ streamAppenderatorTester =
+ new StreamAppenderatorTester.Builder()
+ .maxRowsInMemory(MAX_ROWS_IN_MEMORY)
+ .basePersistDirectory(temporaryFolder.newFolder())
+ .build();
allocator = new TestSegmentAllocator(DATA_SOURCE, Granularities.HOUR);
segmentHandoffNotifierFactory = new TestSegmentHandoffNotifierFactory();
dataSegmentKiller = createStrictMock(DataSegmentKiller.class);
diff --git
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java
index bc123b1896..2e05cb9053 100644
---
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java
+++
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java
@@ -51,7 +51,9 @@ import org.apache.druid.testing.InitializedNullHandlingTest;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.LinearShardSpec;
import org.junit.Assert;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
import java.util.Arrays;
import java.util.Collections;
@@ -69,10 +71,17 @@ public class StreamAppenderatorTest extends
InitializedNullHandlingTest
si("2001/2002", "A", 0)
);
+ @Rule
+ public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
@Test
public void testSimpleIngestion() throws Exception
{
- try (final StreamAppenderatorTester tester = new
StreamAppenderatorTester(2, true)) {
+ try (final StreamAppenderatorTester tester =
+ new StreamAppenderatorTester.Builder().maxRowsInMemory(2)
+ .enablePushFailure(true)
+
.basePersistDirectory(temporaryFolder.newFolder())
+ .build()) {
final Appenderator appenderator = tester.getAppenderator();
boolean thrown;
@@ -128,7 +137,10 @@ public class StreamAppenderatorTest extends
InitializedNullHandlingTest
committerSupplier.get(),
false
).get();
- Assert.assertEquals(ImmutableMap.of("x", "3"), (Map<String, String>)
segmentsAndCommitMetadata.getCommitMetadata());
+ Assert.assertEquals(
+ ImmutableMap.of("x", "3"),
+ (Map<String, String>) segmentsAndCommitMetadata.getCommitMetadata()
+ );
Assert.assertEquals(
IDENTIFIERS.subList(0, 2),
sorted(
@@ -157,15 +169,13 @@ public class StreamAppenderatorTest extends
InitializedNullHandlingTest
public void testMaxBytesInMemoryWithSkipBytesInMemoryOverheadCheckConfig()
throws Exception
{
try (
- final StreamAppenderatorTester tester = new StreamAppenderatorTester(
- 100,
- 1024,
- null,
- true,
- new SimpleRowIngestionMeters(),
- true
- )
- ) {
+ final StreamAppenderatorTester tester =
+ new StreamAppenderatorTester.Builder().maxRowsInMemory(100)
+ .maxSizeInBytes(1024)
+
.basePersistDirectory(temporaryFolder.newFolder())
+ .enablePushFailure(true)
+
.skipBytesInMemoryOverheadCheck(true)
+ .build()) {
final Appenderator appenderator = tester.getAppenderator();
final AtomicInteger eventCount = new AtomicInteger(0);
final Supplier<Committer> committerSupplier = () -> {
@@ -209,15 +219,13 @@ public class StreamAppenderatorTest extends
InitializedNullHandlingTest
public void
testMaxBytesInMemoryInMultipleSinksWithSkipBytesInMemoryOverheadCheckConfig()
throws Exception
{
try (
- final StreamAppenderatorTester tester = new StreamAppenderatorTester(
- 100,
- 1024,
- null,
- true,
- new SimpleRowIngestionMeters(),
- true
- )
- ) {
+ final StreamAppenderatorTester tester =
+ new StreamAppenderatorTester.Builder().maxRowsInMemory(100)
+ .maxSizeInBytes(1024)
+
.basePersistDirectory(temporaryFolder.newFolder())
+ .enablePushFailure(true)
+
.skipBytesInMemoryOverheadCheck(true)
+ .build()) {
final Appenderator appenderator = tester.getAppenderator();
final AtomicInteger eventCount = new AtomicInteger(0);
final Supplier<Committer> committerSupplier = () -> {
@@ -257,7 +265,13 @@ public class StreamAppenderatorTest extends
InitializedNullHandlingTest
@Test
public void testMaxBytesInMemory() throws Exception
{
- try (final StreamAppenderatorTester tester = new
StreamAppenderatorTester(100, 15000, true)) {
+ try (
+ final StreamAppenderatorTester tester =
+ new StreamAppenderatorTester.Builder().maxRowsInMemory(100)
+ .maxSizeInBytes(15000)
+
.basePersistDirectory(temporaryFolder.newFolder())
+ .enablePushFailure(true)
+ .build()) {
final Appenderator appenderator = tester.getAppenderator();
final AtomicInteger eventCount = new AtomicInteger(0);
final Supplier<Committer> committerSupplier = () -> {
@@ -363,7 +377,13 @@ public class StreamAppenderatorTest extends
InitializedNullHandlingTest
@Test(expected = RuntimeException.class)
public void testTaskFailAsPersistCannotFreeAnyMoreMemory() throws Exception
{
- try (final StreamAppenderatorTester tester = new
StreamAppenderatorTester(100, 5180, true)) {
+ try (
+ final StreamAppenderatorTester tester =
+ new StreamAppenderatorTester.Builder().maxRowsInMemory(100)
+ .maxSizeInBytes(5180)
+
.basePersistDirectory(temporaryFolder.newFolder())
+ .enablePushFailure(true)
+ .build()) {
final Appenderator appenderator = tester.getAppenderator();
final AtomicInteger eventCount = new AtomicInteger(0);
final Supplier<Committer> committerSupplier = () -> {
@@ -394,15 +414,13 @@ public class StreamAppenderatorTest extends
InitializedNullHandlingTest
public void
testTaskDoesNotFailAsExceededMemoryWithSkipBytesInMemoryOverheadCheckConfig()
throws Exception
{
try (
- final StreamAppenderatorTester tester = new StreamAppenderatorTester(
- 100,
- 10,
- null,
- true,
- new SimpleRowIngestionMeters(),
- true
- )
- ) {
+ final StreamAppenderatorTester tester =
+ new StreamAppenderatorTester.Builder().maxRowsInMemory(100)
+ .maxSizeInBytes(10)
+
.basePersistDirectory(temporaryFolder.newFolder())
+ .enablePushFailure(true)
+
.skipBytesInMemoryOverheadCheck(true)
+ .build()) {
final Appenderator appenderator = tester.getAppenderator();
final AtomicInteger eventCount = new AtomicInteger(0);
final Supplier<Committer> committerSupplier = () -> {
@@ -443,7 +461,13 @@ public class StreamAppenderatorTest extends
InitializedNullHandlingTest
@Test
public void testTaskCleanupInMemoryCounterAfterCloseWithRowInMemory() throws
Exception
{
- try (final StreamAppenderatorTester tester = new
StreamAppenderatorTester(100, 10000, true)) {
+ try (
+ final StreamAppenderatorTester tester =
+ new StreamAppenderatorTester.Builder().maxRowsInMemory(100)
+ .maxSizeInBytes(10000)
+
.basePersistDirectory(temporaryFolder.newFolder())
+ .enablePushFailure(true)
+ .build()) {
final Appenderator appenderator = tester.getAppenderator();
final AtomicInteger eventCount = new AtomicInteger(0);
final Supplier<Committer> committerSupplier = () -> {
@@ -488,7 +512,13 @@ public class StreamAppenderatorTest extends
InitializedNullHandlingTest
@Test
public void testMaxBytesInMemoryInMultipleSinks() throws Exception
{
- try (final StreamAppenderatorTester tester = new
StreamAppenderatorTester(100, 31100, true)) {
+ try (
+ final StreamAppenderatorTester tester =
+ new StreamAppenderatorTester.Builder().maxRowsInMemory(100)
+ .maxSizeInBytes(31100)
+
.basePersistDirectory(temporaryFolder.newFolder())
+ .enablePushFailure(true)
+ .build()) {
final Appenderator appenderator = tester.getAppenderator();
final AtomicInteger eventCount = new AtomicInteger(0);
final Supplier<Committer> committerSupplier = () -> {
@@ -628,7 +658,13 @@ public class StreamAppenderatorTest extends
InitializedNullHandlingTest
@Test
public void testIgnoreMaxBytesInMemory() throws Exception
{
- try (final StreamAppenderatorTester tester = new
StreamAppenderatorTester(100, -1, true)) {
+ try (
+ final StreamAppenderatorTester tester =
+ new StreamAppenderatorTester.Builder().maxRowsInMemory(100)
+ .maxSizeInBytes(-1)
+
.basePersistDirectory(temporaryFolder.newFolder())
+ .enablePushFailure(true)
+ .build()) {
final Appenderator appenderator = tester.getAppenderator();
final AtomicInteger eventCount = new AtomicInteger(0);
final Supplier<Committer> committerSupplier = () -> {
@@ -676,7 +712,12 @@ public class StreamAppenderatorTest extends
InitializedNullHandlingTest
@Test
public void testMaxRowsInMemory() throws Exception
{
- try (final StreamAppenderatorTester tester = new
StreamAppenderatorTester(3, true)) {
+ try (
+ final StreamAppenderatorTester tester =
+ new StreamAppenderatorTester.Builder().maxRowsInMemory(3)
+
.basePersistDirectory(temporaryFolder.newFolder())
+ .enablePushFailure(true)
+ .build()) {
final Appenderator appenderator = tester.getAppenderator();
final AtomicInteger eventCount = new AtomicInteger(0);
final Supplier<Committer> committerSupplier = new Supplier<Committer>()
@@ -727,7 +768,11 @@ public class StreamAppenderatorTest extends
InitializedNullHandlingTest
@Test
public void testMaxRowsInMemoryDisallowIncrementalPersists() throws Exception
{
- try (final StreamAppenderatorTester tester = new
StreamAppenderatorTester(3, false)) {
+ try (
+ final StreamAppenderatorTester tester =
+ new StreamAppenderatorTester.Builder().maxRowsInMemory(3)
+
.basePersistDirectory(temporaryFolder.newFolder())
+ .build()) {
final Appenderator appenderator = tester.getAppenderator();
final AtomicInteger eventCount = new AtomicInteger(0);
final Supplier<Committer> committerSupplier = () -> {
@@ -774,7 +819,12 @@ public class StreamAppenderatorTest extends
InitializedNullHandlingTest
public void testRestoreFromDisk() throws Exception
{
final RealtimeTuningConfig tuningConfig;
- try (final StreamAppenderatorTester tester = new
StreamAppenderatorTester(2, true)) {
+ try (
+ final StreamAppenderatorTester tester =
+ new StreamAppenderatorTester.Builder().maxRowsInMemory(2)
+
.basePersistDirectory(temporaryFolder.newFolder())
+ .enablePushFailure(true)
+ .build()) {
final Appenderator appenderator = tester.getAppenderator();
tuningConfig = tester.getTuningConfig();
@@ -816,12 +866,12 @@ public class StreamAppenderatorTest extends
InitializedNullHandlingTest
appenderator.add(IDENTIFIERS.get(0), ir("2000", "bob", 5),
committerSupplier);
appenderator.close();
- try (final StreamAppenderatorTester tester2 = new
StreamAppenderatorTester(
- 2,
- -1,
- tuningConfig.getBasePersistDirectory(),
- true
- )) {
+ try (
+ final StreamAppenderatorTester tester2 =
+ new StreamAppenderatorTester.Builder().maxRowsInMemory(2)
+
.basePersistDirectory(tuningConfig.getBasePersistDirectory())
+ .enablePushFailure(true)
+ .build()) {
final Appenderator appenderator2 = tester2.getAppenderator();
Assert.assertEquals(ImmutableMap.of("eventCount", 4),
appenderator2.startJob());
Assert.assertEquals(ImmutableList.of(IDENTIFIERS.get(0)),
appenderator2.getSegments());
@@ -833,7 +883,12 @@ public class StreamAppenderatorTest extends
InitializedNullHandlingTest
@Test(timeout = 60_000L)
public void testTotalRowCount() throws Exception
{
- try (final StreamAppenderatorTester tester = new
StreamAppenderatorTester(3, true)) {
+ try (
+ final StreamAppenderatorTester tester =
+ new StreamAppenderatorTester.Builder().maxRowsInMemory(3)
+
.basePersistDirectory(temporaryFolder.newFolder())
+ .enablePushFailure(true)
+ .build()) {
final Appenderator appenderator = tester.getAppenderator();
final ConcurrentMap<String, String> commitMetadata = new
ConcurrentHashMap<>();
final Supplier<Committer> committerSupplier =
committerSupplierFromConcurrentMap(commitMetadata);
@@ -876,7 +931,13 @@ public class StreamAppenderatorTest extends
InitializedNullHandlingTest
public void testVerifyRowIngestionMetrics() throws Exception
{
final RowIngestionMeters rowIngestionMeters = new
SimpleRowIngestionMeters();
- try (final StreamAppenderatorTester tester = new
StreamAppenderatorTester(5, 10000L, null, false, rowIngestionMeters)) {
+ try (
+ final StreamAppenderatorTester tester =
+ new StreamAppenderatorTester.Builder().maxRowsInMemory(5)
+ .maxSizeInBytes(10000L)
+
.basePersistDirectory(temporaryFolder.newFolder())
+
.rowIngestionMeters(rowIngestionMeters)
+ .build()) {
final Appenderator appenderator = tester.getAppenderator();
appenderator.startJob();
appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", "invalid_met"),
Committers.nilSupplier());
@@ -892,7 +953,12 @@ public class StreamAppenderatorTest extends
InitializedNullHandlingTest
@Test
public void testQueryByIntervals() throws Exception
{
- try (final StreamAppenderatorTester tester = new
StreamAppenderatorTester(2, true)) {
+ try (
+ final StreamAppenderatorTester tester =
+ new StreamAppenderatorTester.Builder().maxRowsInMemory(2)
+
.basePersistDirectory(temporaryFolder.newFolder())
+ .enablePushFailure(true)
+ .build()) {
final Appenderator appenderator = tester.getAppenderator();
appenderator.startJob();
@@ -1028,7 +1094,12 @@ public class StreamAppenderatorTest extends
InitializedNullHandlingTest
@Test
public void testQueryBySegments() throws Exception
{
- try (final StreamAppenderatorTester tester = new
StreamAppenderatorTester(2, true)) {
+ try (
+ final StreamAppenderatorTester tester =
+ new StreamAppenderatorTester.Builder().maxRowsInMemory(2)
+
.basePersistDirectory(temporaryFolder.newFolder())
+ .enablePushFailure(true)
+ .build()) {
final Appenderator appenderator = tester.getAppenderator();
appenderator.startJob();
diff --git
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTester.java
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTester.java
index 808d217f4e..413f315f50 100644
---
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTester.java
+++
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTester.java
@@ -20,6 +20,7 @@
package org.apache.druid.segment.realtime.appenderator;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.client.cache.CachePopulatorStats;
@@ -87,57 +88,10 @@ public class StreamAppenderatorTester implements
AutoCloseable
private final ObjectMapper objectMapper;
private final Appenderator appenderator;
private final ExecutorService queryExecutor;
- private final IndexIO indexIO;
- private final IndexMergerV9 indexMerger;
private final ServiceEmitter emitter;
private final List<DataSegment> pushedSegments = new
CopyOnWriteArrayList<>();
- public StreamAppenderatorTester(
- final int maxRowsInMemory
- )
- {
- this(maxRowsInMemory, -1, null, false);
- }
-
- public StreamAppenderatorTester(
- final int maxRowsInMemory,
- final boolean enablePushFailure
- )
- {
- this(maxRowsInMemory, -1, null, enablePushFailure);
- }
-
- public StreamAppenderatorTester(
- final int maxRowsInMemory,
- final long maxSizeInBytes,
- final boolean enablePushFailure
- )
- {
- this(maxRowsInMemory, maxSizeInBytes, null, enablePushFailure);
- }
-
- public StreamAppenderatorTester(
- final int maxRowsInMemory,
- final long maxSizeInBytes,
- final File basePersistDirectory,
- final boolean enablePushFailure
- )
- {
- this(maxRowsInMemory, maxSizeInBytes, basePersistDirectory,
enablePushFailure, new SimpleRowIngestionMeters(), false);
- }
-
- public StreamAppenderatorTester(
- final int maxRowsInMemory,
- final long maxSizeInBytes,
- final File basePersistDirectory,
- final boolean enablePushFailure,
- final RowIngestionMeters rowIngestionMeters
- )
- {
- this(maxRowsInMemory, maxSizeInBytes, basePersistDirectory,
enablePushFailure, rowIngestionMeters, false);
- }
-
public StreamAppenderatorTester(
final int maxRowsInMemory,
final long maxSizeInBytes,
@@ -199,7 +153,7 @@ public class StreamAppenderatorTester implements
AutoCloseable
metrics = new FireDepartmentMetrics();
queryExecutor = Execs.singleThreaded("queryExecutor(%d)");
- indexIO = new IndexIO(
+ IndexIO indexIO = new IndexIO(
objectMapper,
new ColumnConfig()
{
@@ -210,7 +164,12 @@ public class StreamAppenderatorTester implements
AutoCloseable
}
}
);
- indexMerger = new IndexMergerV9(objectMapper, indexIO,
OffHeapMemorySegmentWriteOutMediumFactory.instance());
+
+ IndexMergerV9 indexMerger = new IndexMergerV9(
+ objectMapper,
+ indexIO,
+ OffHeapMemorySegmentWriteOutMediumFactory.instance()
+ );
emitter = new ServiceEmitter(
"test",
@@ -342,4 +301,62 @@ public class StreamAppenderatorTester implements
AutoCloseable
emitter.close();
FileUtils.deleteDirectory(tuningConfig.getBasePersistDirectory());
}
+
+ public static class Builder
+ {
+ private int maxRowsInMemory;
+ private long maxSizeInBytes = -1;
+ private File basePersistDirectory;
+ private boolean enablePushFailure;
+ private RowIngestionMeters rowIngestionMeters;
+ private boolean skipBytesInMemoryOverheadCheck;
+
+ public Builder maxRowsInMemory(final int maxRowsInMemory)
+ {
+ this.maxRowsInMemory = maxRowsInMemory;
+ return this;
+ }
+
+ public Builder maxSizeInBytes(final long maxSizeInBytes)
+ {
+ this.maxSizeInBytes = maxSizeInBytes;
+ return this;
+ }
+
+ public Builder basePersistDirectory(final File basePersistDirectory)
+ {
+ this.basePersistDirectory = basePersistDirectory;
+ return this;
+ }
+
+ public Builder enablePushFailure(final boolean enablePushFailure)
+ {
+ this.enablePushFailure = enablePushFailure;
+ return this;
+ }
+
+ public Builder rowIngestionMeters(final RowIngestionMeters
rowIngestionMeters)
+ {
+ this.rowIngestionMeters = rowIngestionMeters;
+ return this;
+ }
+
+ public Builder skipBytesInMemoryOverheadCheck(final boolean
skipBytesInMemoryOverheadCheck)
+ {
+ this.skipBytesInMemoryOverheadCheck = skipBytesInMemoryOverheadCheck;
+ return this;
+ }
+
+ public StreamAppenderatorTester build()
+ {
+ return new StreamAppenderatorTester(
+ maxRowsInMemory,
+ maxSizeInBytes,
+ Preconditions.checkNotNull(basePersistDirectory,
"basePersistDirectory"),
+ enablePushFailure,
+ rowIngestionMeters == null ? new SimpleRowIngestionMeters() :
rowIngestionMeters,
+ skipBytesInMemoryOverheadCheck
+ );
+ }
+ }
}
diff --git
a/server/src/test/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java
b/server/src/test/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java
index dcf697a821..eeea2a930e 100644
---
a/server/src/test/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java
+++
b/server/src/test/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java
@@ -68,7 +68,9 @@ import org.joda.time.Interval;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -119,6 +121,9 @@ public class RealtimePlumberSchoolTest extends
InitializedNullHandlingTest
private FireDepartmentMetrics metrics;
private File tmpDir;
+ @Rule
+ public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
public RealtimePlumberSchoolTest(
RejectionPolicyFactory rejectionPolicy,
SegmentWriteOutMediumFactory segmentWriteOutMediumFactory
@@ -207,7 +212,7 @@ public class RealtimePlumberSchoolTest extends
InitializedNullHandlingTest
null,
null,
null,
- null,
+ temporaryFolder.newFolder(),
new IntervalStartVersioningPolicy(),
rejectionPolicy,
null,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]