This is an automated email from the ASF dual-hosted git repository.

gian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new d98c808d3f Remove basePersistDirectory from tuning configs. (#13040)
d98c808d3f is described below

commit d98c808d3f087645d8dcfd516e77b0058bdfd389
Author: Gian Merlino <[email protected]>
AuthorDate: Fri Oct 21 17:25:36 2022 -0700

    Remove basePersistDirectory from tuning configs. (#13040)
    
    * Remove basePersistDirectory from tuning configs.
    
    Since the removal of CliRealtime, it serves no purpose, since it is
    always overridden in production using withBasePersistDirectory given
    some subdirectory of the task work directory.
    
    Removing this from the tuning config has a benefit beyond removing
    no-longer-needed logic: it also avoids the side effect of empty
    "druid-realtime-persist" directories getting created in the systemwide
    temp directory.
    
    * Test adjustments to appropriately set basePersistDirectory.
    
    * Remove unused import.
    
    * Fix RATC constructor.
---
 .../indexing/kafka/KafkaIndexTaskTuningConfig.java |  55 ++++++-
 .../supervisor/KafkaSupervisorTuningConfig.java    |   8 +-
 .../kafka/KafkaIndexTaskTuningConfigTest.java      |  12 +-
 .../kafka/supervisor/KafkaSupervisorTest.java      |   6 -
 .../KafkaSupervisorTuningConfigTest.java           |   6 +-
 .../TestModifiedKafkaIndexTaskTuningConfig.java    |   4 +-
 .../kinesis/KinesisIndexTaskTuningConfig.java      |  93 +++++++++---
 .../supervisor/KinesisSupervisorTuningConfig.java  |   8 +-
 .../kinesis/KinesisIndexTaskTuningConfigTest.java  |  11 +-
 .../kinesis/supervisor/KinesisSupervisorTest.java  |   4 -
 .../KinesisSupervisorTuningConfigTest.java         |   6 +-
 .../TestModifiedKinesisIndexTaskTuningConfig.java  |   4 +-
 .../index/RealtimeAppenderatorTuningConfig.java    |  95 ++++++++----
 .../SeekableStreamIndexTaskTuningConfig.java       |   3 +-
 .../segment/indexing/RealtimeTuningConfig.java     | 105 ++++++++-----
 .../segment/indexing/RealtimeTuningConfigTest.java |  13 +-
 .../appenderator/AppenderatorPlumberTest.java      |  24 ++-
 .../appenderator/StreamAppenderatorDriverTest.java |  13 +-
 .../appenderator/StreamAppenderatorTest.java       | 165 +++++++++++++++------
 .../appenderator/StreamAppenderatorTester.java     | 115 ++++++++------
 .../plumber/RealtimePlumberSchoolTest.java         |   7 +-
 21 files changed, 500 insertions(+), 257 deletions(-)

diff --git 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfig.java
 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfig.java
index c0dfb334e8..be3aac9ac2 100644
--- 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfig.java
+++ 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfig.java
@@ -32,8 +32,54 @@ import java.io.File;
 
 public class KafkaIndexTaskTuningConfig extends 
SeekableStreamIndexTaskTuningConfig
 {
-  @JsonCreator
   public KafkaIndexTaskTuningConfig(
+      @Nullable AppendableIndexSpec appendableIndexSpec,
+      @Nullable Integer maxRowsInMemory,
+      @Nullable Long maxBytesInMemory,
+      @Nullable Boolean skipBytesInMemoryOverheadCheck,
+      @Nullable Integer maxRowsPerSegment,
+      @Nullable Long maxTotalRows,
+      @Nullable Period intermediatePersistPeriod,
+      @Nullable File basePersistDirectory,
+      @Nullable Integer maxPendingPersists,
+      @Nullable IndexSpec indexSpec,
+      @Nullable IndexSpec indexSpecForIntermediatePersists,
+      @Nullable Boolean reportParseExceptions,
+      @Nullable Long handoffConditionTimeout,
+      @Nullable Boolean resetOffsetAutomatically,
+      @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
+      @Nullable Period intermediateHandoffPeriod,
+      @Nullable Boolean logParseExceptions,
+      @Nullable Integer maxParseExceptions,
+      @Nullable Integer maxSavedParseExceptions
+  )
+  {
+    super(
+        appendableIndexSpec,
+        maxRowsInMemory,
+        maxBytesInMemory,
+        skipBytesInMemoryOverheadCheck,
+        maxRowsPerSegment,
+        maxTotalRows,
+        intermediatePersistPeriod,
+        basePersistDirectory,
+        maxPendingPersists,
+        indexSpec,
+        indexSpecForIntermediatePersists,
+        reportParseExceptions,
+        handoffConditionTimeout,
+        resetOffsetAutomatically,
+        false,
+        segmentWriteOutMediumFactory,
+        intermediateHandoffPeriod,
+        logParseExceptions,
+        maxParseExceptions,
+        maxSavedParseExceptions
+    );
+  }
+
+  @JsonCreator
+  private KafkaIndexTaskTuningConfig(
       @JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec 
appendableIndexSpec,
       @JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory,
       @JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory,
@@ -41,7 +87,6 @@ public class KafkaIndexTaskTuningConfig extends 
SeekableStreamIndexTaskTuningCon
       @JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment,
       @JsonProperty("maxTotalRows") @Nullable Long maxTotalRows,
       @JsonProperty("intermediatePersistPeriod") @Nullable Period 
intermediatePersistPeriod,
-      @JsonProperty("basePersistDirectory") @Nullable File 
basePersistDirectory,
       @JsonProperty("maxPendingPersists") @Nullable Integer maxPendingPersists,
       @JsonProperty("indexSpec") @Nullable IndexSpec indexSpec,
       @JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec 
indexSpecForIntermediatePersists,
@@ -55,7 +100,7 @@ public class KafkaIndexTaskTuningConfig extends 
SeekableStreamIndexTaskTuningCon
       @JsonProperty("maxSavedParseExceptions") @Nullable Integer 
maxSavedParseExceptions
   )
   {
-    super(
+    this(
         appendableIndexSpec,
         maxRowsInMemory,
         maxBytesInMemory,
@@ -63,14 +108,13 @@ public class KafkaIndexTaskTuningConfig extends 
SeekableStreamIndexTaskTuningCon
         maxRowsPerSegment,
         maxTotalRows,
         intermediatePersistPeriod,
-        basePersistDirectory,
+        null,
         maxPendingPersists,
         indexSpec,
         indexSpecForIntermediatePersists,
         reportParseExceptions,
         handoffConditionTimeout,
         resetOffsetAutomatically,
-        false,
         segmentWriteOutMediumFactory,
         intermediateHandoffPeriod,
         logParseExceptions,
@@ -116,7 +160,6 @@ public class KafkaIndexTaskTuningConfig extends 
SeekableStreamIndexTaskTuningCon
            ", maxBytesInMemory=" + getMaxBytesInMemory() +
            ", skipBytesInMemoryOverheadCheck=" + 
isSkipBytesInMemoryOverheadCheck() +
            ", intermediatePersistPeriod=" + getIntermediatePersistPeriod() +
-           ", basePersistDirectory=" + getBasePersistDirectory() +
            ", maxPendingPersists=" + getMaxPendingPersists() +
            ", indexSpec=" + getIndexSpec() +
            ", indexSpecForIntermediatePersists=" + 
getIndexSpecForIntermediatePersists() +
diff --git 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java
 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java
index 4fc5489a8d..596e532924 100644
--- 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java
+++ 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java
@@ -29,7 +29,6 @@ import org.joda.time.Duration;
 import org.joda.time.Period;
 
 import javax.annotation.Nullable;
-import java.io.File;
 
 public class KafkaSupervisorTuningConfig extends KafkaIndexTaskTuningConfig
     implements SeekableStreamSupervisorTuningConfig
@@ -67,7 +66,6 @@ public class KafkaSupervisorTuningConfig extends 
KafkaIndexTaskTuningConfig
         null,
         null,
         null,
-        null,
         null
     );
   }
@@ -80,7 +78,6 @@ public class KafkaSupervisorTuningConfig extends 
KafkaIndexTaskTuningConfig
       @JsonProperty("maxRowsPerSegment") Integer maxRowsPerSegment,
       @JsonProperty("maxTotalRows") Long maxTotalRows,
       @JsonProperty("intermediatePersistPeriod") Period 
intermediatePersistPeriod,
-      @JsonProperty("basePersistDirectory") File basePersistDirectory,
       @JsonProperty("maxPendingPersists") Integer maxPendingPersists,
       @JsonProperty("indexSpec") IndexSpec indexSpec,
       @JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec 
indexSpecForIntermediatePersists,
@@ -108,7 +105,7 @@ public class KafkaSupervisorTuningConfig extends 
KafkaIndexTaskTuningConfig
         maxRowsPerSegment,
         maxTotalRows,
         intermediatePersistPeriod,
-        basePersistDirectory,
+        null,
         maxPendingPersists,
         indexSpec,
         indexSpecForIntermediatePersists,
@@ -198,7 +195,6 @@ public class KafkaSupervisorTuningConfig extends 
KafkaIndexTaskTuningConfig
            ", maxBytesInMemory=" + getMaxBytesInMemoryOrDefault() +
            ", skipBytesInMemoryOverheadCheck=" + 
isSkipBytesInMemoryOverheadCheck() +
            ", intermediatePersistPeriod=" + getIntermediatePersistPeriod() +
-           ", basePersistDirectory=" + getBasePersistDirectory() +
            ", maxPendingPersists=" + getMaxPendingPersists() +
            ", indexSpec=" + getIndexSpec() +
            ", reportParseExceptions=" + isReportParseExceptions() +
@@ -229,7 +225,7 @@ public class KafkaSupervisorTuningConfig extends 
KafkaIndexTaskTuningConfig
         getMaxRowsPerSegment(),
         getMaxTotalRows(),
         getIntermediatePersistPeriod(),
-        getBasePersistDirectory(),
+        null,
         getMaxPendingPersists(),
         getIndexSpec(),
         getIndexSpecForIntermediatePersists(),
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java
index 6674c36f17..2c5bad93cb 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java
@@ -61,7 +61,7 @@ public class KafkaIndexTaskTuningConfigTest
         TuningConfig.class
     );
 
-    Assert.assertNotNull(config.getBasePersistDirectory());
+    Assert.assertNull(config.getBasePersistDirectory());
     Assert.assertEquals(new OnheapIncrementalIndex.Spec(), 
config.getAppendableIndexSpec());
     Assert.assertEquals(1000000, config.getMaxRowsInMemory());
     Assert.assertEquals(5_000_000, config.getMaxRowsPerSegment().intValue());
@@ -102,7 +102,7 @@ public class KafkaIndexTaskTuningConfigTest
         TuningConfig.class
     );
 
-    Assert.assertEquals(new File("/tmp/xxx"), 
config.getBasePersistDirectory());
+    Assert.assertNull(config.getBasePersistDirectory());
     Assert.assertEquals(new OnheapIncrementalIndex.Spec(), 
config.getAppendableIndexSpec());
     Assert.assertEquals(100, config.getMaxRowsInMemory());
     Assert.assertEquals(100, config.getMaxRowsPerSegment().intValue());
@@ -127,7 +127,6 @@ public class KafkaIndexTaskTuningConfigTest
         2,
         10L,
         new Period("PT3S"),
-        new File("/tmp/xxx"),
         4,
         new IndexSpec(),
         new IndexSpec(),
@@ -146,7 +145,7 @@ public class KafkaIndexTaskTuningConfigTest
         null,
         null
     );
-    KafkaIndexTaskTuningConfig copy = (KafkaIndexTaskTuningConfig) 
original.convertToTaskTuningConfig();
+    KafkaIndexTaskTuningConfig copy = original.convertToTaskTuningConfig();
 
     Assert.assertEquals(original.getAppendableIndexSpec(), 
copy.getAppendableIndexSpec());
     Assert.assertEquals(1, copy.getMaxRowsInMemory());
@@ -154,7 +153,7 @@ public class KafkaIndexTaskTuningConfigTest
     Assert.assertNotEquals(null, copy.getMaxTotalRows());
     Assert.assertEquals(10L, copy.getMaxTotalRows().longValue());
     Assert.assertEquals(new Period("PT3S"), 
copy.getIntermediatePersistPeriod());
-    Assert.assertEquals(new File("/tmp/xxx"), copy.getBasePersistDirectory());
+    Assert.assertNull(copy.getBasePersistDirectory());
     Assert.assertEquals(4, copy.getMaxPendingPersists());
     Assert.assertEquals(new IndexSpec(), copy.getIndexSpec());
     Assert.assertEquals(true, copy.isReportParseExceptions());
@@ -197,7 +196,7 @@ public class KafkaIndexTaskTuningConfigTest
     Assert.assertEquals(base.getMaxRowsPerSegment(), 
deserialized.getMaxRowsPerSegment());
     Assert.assertEquals(base.getMaxTotalRows(), 
deserialized.getMaxTotalRows());
     Assert.assertEquals(base.getIntermediatePersistPeriod(), 
deserialized.getIntermediatePersistPeriod());
-    Assert.assertEquals(base.getBasePersistDirectory(), 
deserialized.getBasePersistDirectory());
+    Assert.assertNull(deserialized.getBasePersistDirectory());
     Assert.assertEquals(base.getMaxPendingPersists(), 
deserialized.getMaxPendingPersists());
     Assert.assertEquals(base.getIndexSpec(), deserialized.getIndexSpec());
     Assert.assertEquals(base.isReportParseExceptions(), 
deserialized.isReportParseExceptions());
@@ -221,7 +220,6 @@ public class KafkaIndexTaskTuningConfigTest
         2,
         10L,
         new Period("PT3S"),
-        new File("/tmp/xxx"),
         4,
         new IndexSpec(),
         new IndexSpec(),
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index c30795cdd1..78b010b3b3 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -115,7 +115,6 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
-import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -313,7 +312,6 @@ public class KafkaSupervisorTest extends EasyMockSupport
             50000,
             null,
             new Period("P1Y"),
-            new File("/test"),
             null,
             null,
             null,
@@ -3771,7 +3769,6 @@ public class KafkaSupervisorTest extends EasyMockSupport
             50000,
             null,
             new Period("P1Y"),
-            new File("/test"),
             null,
             null,
             null,
@@ -3811,7 +3808,6 @@ public class KafkaSupervisorTest extends EasyMockSupport
         50000,
         null,
         new Period("P1Y"),
-        new File("/test"),
         null,
         null,
         null,
@@ -4138,7 +4134,6 @@ public class KafkaSupervisorTest extends EasyMockSupport
         50000,
         null,
         new Period("P1Y"),
-        new File("/test"),
         null,
         null,
         null,
@@ -4252,7 +4247,6 @@ public class KafkaSupervisorTest extends EasyMockSupport
         50000,
         null,
         new Period("P1Y"),
-        new File("/test"),
         null,
         null,
         null,
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java
index 7151eb0681..5ac97b12c8 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java
@@ -32,8 +32,6 @@ import org.joda.time.Period;
 import org.junit.Assert;
 import org.junit.Test;
 
-import java.io.File;
-
 public class KafkaSupervisorTuningConfigTest
 {
   private final ObjectMapper mapper;
@@ -59,7 +57,7 @@ public class KafkaSupervisorTuningConfigTest
         TuningConfig.class
     );
 
-    Assert.assertNotNull(config.getBasePersistDirectory());
+    Assert.assertNull(config.getBasePersistDirectory());
     Assert.assertEquals(new OnheapIncrementalIndex.Spec(), 
config.getAppendableIndexSpec());
     Assert.assertEquals(1000000, config.getMaxRowsInMemory());
     Assert.assertEquals(5_000_000, config.getMaxRowsPerSegment().intValue());
@@ -110,7 +108,7 @@ public class KafkaSupervisorTuningConfigTest
         TuningConfig.class
     );
 
-    Assert.assertEquals(new File("/tmp/xxx"), 
config.getBasePersistDirectory());
+    Assert.assertNull(config.getBasePersistDirectory());
     Assert.assertEquals(new OnheapIncrementalIndex.Spec(), 
config.getAppendableIndexSpec());
     Assert.assertEquals(100, config.getMaxRowsInMemory());
     Assert.assertEquals(100, config.getMaxRowsPerSegment().intValue());
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestModifiedKafkaIndexTaskTuningConfig.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestModifiedKafkaIndexTaskTuningConfig.java
index 642141c3c4..fd6cbb48b8 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestModifiedKafkaIndexTaskTuningConfig.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestModifiedKafkaIndexTaskTuningConfig.java
@@ -29,7 +29,6 @@ import 
org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
 import org.joda.time.Period;
 
 import javax.annotation.Nullable;
-import java.io.File;
 
 @JsonTypeName("KafkaTuningConfig")
 public class TestModifiedKafkaIndexTaskTuningConfig extends 
KafkaIndexTaskTuningConfig
@@ -45,7 +44,6 @@ public class TestModifiedKafkaIndexTaskTuningConfig extends 
KafkaIndexTaskTuning
       @JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment,
       @JsonProperty("maxTotalRows") @Nullable Long maxTotalRows,
       @JsonProperty("intermediatePersistPeriod") @Nullable Period 
intermediatePersistPeriod,
-      @JsonProperty("basePersistDirectory") @Nullable File 
basePersistDirectory,
       @JsonProperty("maxPendingPersists") @Nullable Integer maxPendingPersists,
       @JsonProperty("indexSpec") @Nullable IndexSpec indexSpec,
       @JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec 
indexSpecForIntermediatePersists,
@@ -68,7 +66,7 @@ public class TestModifiedKafkaIndexTaskTuningConfig extends 
KafkaIndexTaskTuning
         maxRowsPerSegment,
         maxTotalRows,
         intermediatePersistPeriod,
-        basePersistDirectory,
+        null,
         maxPendingPersists,
         indexSpec,
         indexSpecForIntermediatePersists,
diff --git 
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java
 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java
index 21c0510758..4e9dbafb7c 100644
--- 
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java
+++ 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfig.java
@@ -47,8 +47,72 @@ public class KinesisIndexTaskTuningConfig extends 
SeekableStreamIndexTaskTuningC
   private final Integer fetchThreads;
   private final int maxRecordsPerPoll;
 
-  @JsonCreator
   public KinesisIndexTaskTuningConfig(
+      @Nullable AppendableIndexSpec appendableIndexSpec,
+      Integer maxRowsInMemory,
+      Long maxBytesInMemory,
+      @Nullable Boolean skipBytesInMemoryOverheadCheck,
+      Integer maxRowsPerSegment,
+      Long maxTotalRows,
+      Period intermediatePersistPeriod,
+      File basePersistDirectory,
+      Integer maxPendingPersists,
+      IndexSpec indexSpec,
+      @Nullable IndexSpec indexSpecForIntermediatePersists,
+      Boolean reportParseExceptions,
+      Long handoffConditionTimeout,
+      Boolean resetOffsetAutomatically,
+      Boolean skipSequenceNumberAvailabilityCheck,
+      Integer recordBufferSize,
+      Integer recordBufferOfferTimeout,
+      Integer recordBufferFullWait,
+      Integer fetchThreads,
+      @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
+      @Nullable Boolean logParseExceptions,
+      @Nullable Integer maxParseExceptions,
+      @Nullable Integer maxSavedParseExceptions,
+      @Nullable Integer maxRecordsPerPoll,
+      @Nullable Period intermediateHandoffPeriod
+  )
+  {
+    super(
+        appendableIndexSpec,
+        maxRowsInMemory,
+        maxBytesInMemory,
+        skipBytesInMemoryOverheadCheck,
+        maxRowsPerSegment,
+        maxTotalRows,
+        intermediatePersistPeriod,
+        basePersistDirectory,
+        maxPendingPersists,
+        indexSpec,
+        indexSpecForIntermediatePersists,
+        reportParseExceptions,
+        handoffConditionTimeout,
+        resetOffsetAutomatically,
+        skipSequenceNumberAvailabilityCheck,
+        segmentWriteOutMediumFactory,
+        intermediateHandoffPeriod,
+        logParseExceptions,
+        maxParseExceptions,
+        maxSavedParseExceptions
+    );
+    this.recordBufferSize = recordBufferSize == null ? 
DEFAULT_RECORD_BUFFER_SIZE : recordBufferSize;
+    this.recordBufferOfferTimeout = recordBufferOfferTimeout == null
+                                    ? DEFAULT_RECORD_BUFFER_OFFER_TIMEOUT
+                                    : recordBufferOfferTimeout;
+    this.recordBufferFullWait = recordBufferFullWait == null ? 
DEFAULT_RECORD_BUFFER_FULL_WAIT : recordBufferFullWait;
+    this.fetchThreads = fetchThreads; // we handle this being null later
+    this.maxRecordsPerPoll = maxRecordsPerPoll == null ? 
DEFAULT_MAX_RECORDS_PER_POLL : maxRecordsPerPoll;
+
+    Preconditions.checkArgument(
+        !(super.isResetOffsetAutomatically() && 
super.isSkipSequenceNumberAvailabilityCheck()),
+        "resetOffsetAutomatically cannot be used if 
skipSequenceNumberAvailabilityCheck=true"
+    );
+  }
+
+  @JsonCreator
+  private KinesisIndexTaskTuningConfig(
       @JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec 
appendableIndexSpec,
       @JsonProperty("maxRowsInMemory") Integer maxRowsInMemory,
       @JsonProperty("maxBytesInMemory") Long maxBytesInMemory,
@@ -56,7 +120,6 @@ public class KinesisIndexTaskTuningConfig extends 
SeekableStreamIndexTaskTuningC
       @JsonProperty("maxRowsPerSegment") Integer maxRowsPerSegment,
       @JsonProperty("maxTotalRows") Long maxTotalRows,
       @JsonProperty("intermediatePersistPeriod") Period 
intermediatePersistPeriod,
-      @JsonProperty("basePersistDirectory") File basePersistDirectory,
       @JsonProperty("maxPendingPersists") Integer maxPendingPersists,
       @JsonProperty("indexSpec") IndexSpec indexSpec,
       @JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec 
indexSpecForIntermediatePersists,
@@ -76,7 +139,7 @@ public class KinesisIndexTaskTuningConfig extends 
SeekableStreamIndexTaskTuningC
       @JsonProperty("intermediateHandoffPeriod") @Nullable Period 
intermediateHandoffPeriod
   )
   {
-    super(
+    this(
         appendableIndexSpec,
         maxRowsInMemory,
         maxBytesInMemory,
@@ -84,7 +147,7 @@ public class KinesisIndexTaskTuningConfig extends 
SeekableStreamIndexTaskTuningC
         maxRowsPerSegment,
         maxTotalRows,
         intermediatePersistPeriod,
-        basePersistDirectory,
+        null,
         maxPendingPersists,
         indexSpec,
         indexSpecForIntermediatePersists,
@@ -92,23 +155,16 @@ public class KinesisIndexTaskTuningConfig extends 
SeekableStreamIndexTaskTuningC
         handoffConditionTimeout,
         resetOffsetAutomatically,
         skipSequenceNumberAvailabilityCheck,
+        recordBufferSize,
+        recordBufferOfferTimeout,
+        recordBufferFullWait,
+        fetchThreads,
         segmentWriteOutMediumFactory,
-        intermediateHandoffPeriod,
         logParseExceptions,
         maxParseExceptions,
-        maxSavedParseExceptions
-    );
-    this.recordBufferSize = recordBufferSize == null ? 
DEFAULT_RECORD_BUFFER_SIZE : recordBufferSize;
-    this.recordBufferOfferTimeout = recordBufferOfferTimeout == null
-                                    ? DEFAULT_RECORD_BUFFER_OFFER_TIMEOUT
-                                    : recordBufferOfferTimeout;
-    this.recordBufferFullWait = recordBufferFullWait == null ? 
DEFAULT_RECORD_BUFFER_FULL_WAIT : recordBufferFullWait;
-    this.fetchThreads = fetchThreads; // we handle this being null later
-    this.maxRecordsPerPoll = maxRecordsPerPoll == null ? 
DEFAULT_MAX_RECORDS_PER_POLL : maxRecordsPerPoll;
-
-    Preconditions.checkArgument(
-        !(super.isResetOffsetAutomatically() && 
super.isSkipSequenceNumberAvailabilityCheck()),
-        "resetOffsetAutomatically cannot be used if 
skipSequenceNumberAvailabilityCheck=true"
+        maxSavedParseExceptions,
+        maxRecordsPerPoll,
+        intermediateHandoffPeriod
     );
   }
 
@@ -217,7 +273,6 @@ public class KinesisIndexTaskTuningConfig extends 
SeekableStreamIndexTaskTuningC
            ", maxRowsPerSegment=" + getMaxRowsPerSegment() +
            ", maxTotalRows=" + getMaxTotalRows() +
            ", intermediatePersistPeriod=" + getIntermediatePersistPeriod() +
-           ", basePersistDirectory=" + getBasePersistDirectory() +
            ", maxPendingPersists=" + getMaxPendingPersists() +
            ", indexSpec=" + getIndexSpec() +
            ", reportParseExceptions=" + isReportParseExceptions() +
diff --git 
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java
 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java
index 57bae9f2a8..ba8f4311f0 100644
--- 
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java
+++ 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfig.java
@@ -29,7 +29,6 @@ import org.joda.time.Duration;
 import org.joda.time.Period;
 
 import javax.annotation.Nullable;
-import java.io.File;
 
 public class KinesisSupervisorTuningConfig extends KinesisIndexTaskTuningConfig
     implements SeekableStreamSupervisorTuningConfig
@@ -79,7 +78,6 @@ public class KinesisSupervisorTuningConfig extends 
KinesisIndexTaskTuningConfig
         null,
         null,
         null,
-        null,
         null
     );
   }
@@ -92,7 +90,6 @@ public class KinesisSupervisorTuningConfig extends 
KinesisIndexTaskTuningConfig
       @JsonProperty("maxRowsPerSegment") Integer maxRowsPerSegment,
       @JsonProperty("maxTotalRows") Long maxTotalRows,
       @JsonProperty("intermediatePersistPeriod") Period 
intermediatePersistPeriod,
-      @JsonProperty("basePersistDirectory") File basePersistDirectory,
       @JsonProperty("maxPendingPersists") Integer maxPendingPersists,
       @JsonProperty("indexSpec") IndexSpec indexSpec,
       @JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec 
indexSpecForIntermediatePersists,
@@ -129,7 +126,7 @@ public class KinesisSupervisorTuningConfig extends 
KinesisIndexTaskTuningConfig
         maxRowsPerSegment,
         maxTotalRows,
         intermediatePersistPeriod,
-        basePersistDirectory,
+        null,
         maxPendingPersists,
         indexSpec,
         indexSpecForIntermediatePersists,
@@ -239,7 +236,6 @@ public class KinesisSupervisorTuningConfig extends 
KinesisIndexTaskTuningConfig
            ", maxRowsPerSegment=" + getMaxRowsPerSegment() +
            ", maxTotalRows=" + getMaxTotalRows() +
            ", intermediatePersistPeriod=" + getIntermediatePersistPeriod() +
-           ", basePersistDirectory=" + getBasePersistDirectory() +
            ", maxPendingPersists=" + getMaxPendingPersists() +
            ", indexSpec=" + getIndexSpec() +
            ", reportParseExceptions=" + isReportParseExceptions() +
@@ -278,7 +274,7 @@ public class KinesisSupervisorTuningConfig extends 
KinesisIndexTaskTuningConfig
         getMaxRowsPerSegment(),
         getMaxTotalRows(),
         getIntermediatePersistPeriod(),
-        getBasePersistDirectory(),
+        null,
         getMaxPendingPersists(),
         getIndexSpec(),
         getIndexSpecForIntermediatePersists(),
diff --git 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java
 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java
index 9aa1f6127b..61db57449d 100644
--- 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java
+++ 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTuningConfigTest.java
@@ -67,7 +67,7 @@ public class KinesisIndexTaskTuningConfigTest
         TuningConfig.class
     );
 
-    Assert.assertNotNull(config.getBasePersistDirectory());
+    Assert.assertNull(config.getBasePersistDirectory());
     Assert.assertEquals(new OnheapIncrementalIndex.Spec(), 
config.getAppendableIndexSpec());
     Assert.assertEquals(1000000, config.getMaxRowsInMemory());
     Assert.assertEquals(5_000_000, config.getMaxRowsPerSegment().intValue());
@@ -115,7 +115,7 @@ public class KinesisIndexTaskTuningConfigTest
         TuningConfig.class
     );
 
-    Assert.assertEquals(new File("/tmp/xxx"), 
config.getBasePersistDirectory());
+    Assert.assertNull(config.getBasePersistDirectory());
     Assert.assertEquals(new OnheapIncrementalIndex.Spec(), 
config.getAppendableIndexSpec());
     Assert.assertEquals(100, config.getMaxRowsInMemory());
     Assert.assertEquals(100, config.getMaxRowsPerSegment().intValue());
@@ -173,7 +173,7 @@ public class KinesisIndexTaskTuningConfigTest
     Assert.assertEquals(base.getMaxRowsPerSegment(), 
deserialized.getMaxRowsPerSegment());
     Assert.assertEquals(base.getMaxTotalRows(), 
deserialized.getMaxTotalRows());
     Assert.assertEquals(base.getIntermediatePersistPeriod(), 
deserialized.getIntermediatePersistPeriod());
-    Assert.assertEquals(base.getBasePersistDirectory(), 
deserialized.getBasePersistDirectory());
+    Assert.assertNull(deserialized.getBasePersistDirectory());
     Assert.assertEquals(base.getMaxPendingPersists(), 
deserialized.getMaxPendingPersists());
     Assert.assertEquals(base.getIndexSpec(), deserialized.getIndexSpec());
     Assert.assertEquals(base.isReportParseExceptions(), 
deserialized.isReportParseExceptions());
@@ -231,7 +231,7 @@ public class KinesisIndexTaskTuningConfigTest
     Assert.assertEquals(base.getMaxRowsPerSegment(), 
deserialized.getMaxRowsPerSegment());
     Assert.assertEquals(base.getMaxTotalRows(), 
deserialized.getMaxTotalRows());
     Assert.assertEquals(base.getIntermediatePersistPeriod(), 
deserialized.getIntermediatePersistPeriod());
-    Assert.assertEquals(base.getBasePersistDirectory(), 
deserialized.getBasePersistDirectory());
+    Assert.assertNull(deserialized.getBasePersistDirectory());
     Assert.assertEquals(base.getMaxPendingPersists(), 
deserialized.getMaxPendingPersists());
     Assert.assertEquals(base.getIndexSpec(), deserialized.getIndexSpec());
     Assert.assertEquals(base.isReportParseExceptions(), 
deserialized.isReportParseExceptions());
@@ -286,7 +286,6 @@ public class KinesisIndexTaskTuningConfigTest
         2,
         100L,
         new Period("PT3S"),
-        new File("/tmp/xxx"),
         4,
         new IndexSpec(),
         new IndexSpec(),
@@ -322,7 +321,7 @@ public class KinesisIndexTaskTuningConfigTest
     Assert.assertEquals(2, copy.getMaxRowsPerSegment().intValue());
     Assert.assertEquals(100L, (long) copy.getMaxTotalRows());
     Assert.assertEquals(new Period("PT3S"), 
copy.getIntermediatePersistPeriod());
-    Assert.assertEquals(new File("/tmp/xxx"), copy.getBasePersistDirectory());
+    Assert.assertNull(copy.getBasePersistDirectory());
     Assert.assertEquals(4, copy.getMaxPendingPersists());
     Assert.assertEquals(new IndexSpec(), copy.getIndexSpec());
     Assert.assertTrue(copy.isReportParseExceptions());
diff --git 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
index 99d8d5223b..be836567eb 100644
--- 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
+++ 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTest.java
@@ -103,7 +103,6 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import java.io.File;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -185,7 +184,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
         50000,
         null,
         new Period("P1Y"),
-        new File("/test"),
         null,
         null,
         null,
@@ -3948,7 +3946,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
         50000,
         null,
         new Period("P1Y"),
-        new File("/test"),
         null,
         null,
         null,
@@ -5061,7 +5058,6 @@ public class KinesisSupervisorTest extends EasyMockSupport
         50000,
         null,
         new Period("P1Y"),
-        new File("/test"),
         null,
         null,
         null,
diff --git 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfigTest.java
 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfigTest.java
index c043cbdea5..a5c48b35b6 100644
--- 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfigTest.java
+++ 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisorTuningConfigTest.java
@@ -31,8 +31,6 @@ import org.joda.time.Period;
 import org.junit.Assert;
 import org.junit.Test;
 
-import java.io.File;
-
 public class KinesisSupervisorTuningConfigTest
 {
   private final ObjectMapper mapper;
@@ -58,7 +56,7 @@ public class KinesisSupervisorTuningConfigTest
         TuningConfig.class
     );
 
-    Assert.assertNotNull(config.getBasePersistDirectory());
+    Assert.assertNull(config.getBasePersistDirectory());
     Assert.assertEquals(new OnheapIncrementalIndex.Spec(), 
config.getAppendableIndexSpec());
     Assert.assertEquals(1000000, config.getMaxRowsInMemory());
     Assert.assertEquals(5_000_000, config.getMaxRowsPerSegment().intValue());
@@ -106,7 +104,7 @@ public class KinesisSupervisorTuningConfigTest
         TuningConfig.class
     );
 
-    Assert.assertEquals(new File("/tmp/xxx"), 
config.getBasePersistDirectory());
+    Assert.assertNull(config.getBasePersistDirectory());
     Assert.assertEquals(new OnheapIncrementalIndex.Spec(), 
config.getAppendableIndexSpec());
     Assert.assertEquals(100, config.getMaxRowsInMemory());
     Assert.assertEquals(100, config.getMaxRowsPerSegment().intValue());
diff --git 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java
 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java
index c6ce08ab2a..2772cc5794 100644
--- 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java
+++ 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/test/TestModifiedKinesisIndexTaskTuningConfig.java
@@ -29,7 +29,6 @@ import 
org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
 import org.joda.time.Period;
 
 import javax.annotation.Nullable;
-import java.io.File;
 
 @JsonTypeName("KinesisTuningConfig")
 public class TestModifiedKinesisIndexTaskTuningConfig extends 
KinesisIndexTaskTuningConfig
@@ -45,7 +44,6 @@ public class TestModifiedKinesisIndexTaskTuningConfig extends 
KinesisIndexTaskTu
       @JsonProperty("maxRowsPerSegment") Integer maxRowsPerSegment,
       @JsonProperty("maxTotalRows") Long maxTotalRows,
       @JsonProperty("intermediatePersistPeriod") Period 
intermediatePersistPeriod,
-      @JsonProperty("basePersistDirectory") File basePersistDirectory,
       @JsonProperty("maxPendingPersists") Integer maxPendingPersists,
       @JsonProperty("indexSpec") IndexSpec indexSpec,
       @JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec 
indexSpecForIntermediatePersists,
@@ -74,7 +72,7 @@ public class TestModifiedKinesisIndexTaskTuningConfig extends 
KinesisIndexTaskTu
         maxRowsPerSegment,
         maxTotalRows,
         intermediatePersistPeriod,
-        basePersistDirectory,
+        null,
         maxPendingPersists,
         indexSpec,
         indexSpecForIntermediatePersists,
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java
index 172d8eb0f1..8b11eddb7c 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/index/RealtimeAppenderatorTuningConfig.java
@@ -24,7 +24,6 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 import com.google.common.base.Preconditions;
 import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
-import org.apache.druid.java.util.common.FileUtils;
 import org.apache.druid.segment.IndexSpec;
 import org.apache.druid.segment.incremental.AppendableIndexSpec;
 import org.apache.druid.segment.indexing.TuningConfig;
@@ -48,11 +47,6 @@ public class RealtimeAppenderatorTuningConfig implements 
AppenderatorConfig
   private static final long DEFAULT_HANDOFF_CONDITION_TIMEOUT = 0;
   private static final long DEFAULT_ALERT_TIMEOUT = 0;
 
-  private static File createNewBasePersistDirectory()
-  {
-    return FileUtils.createTempDir("druid-realtime-persist");
-  }
-
   private final AppendableIndexSpec appendableIndexSpec;
   private final int maxRowsInMemory;
   private final long maxBytesInMemory;
@@ -74,27 +68,26 @@ public class RealtimeAppenderatorTuningConfig implements 
AppenderatorConfig
   private final int maxParseExceptions;
   private final int maxSavedParseExceptions;
 
-  @JsonCreator
   public RealtimeAppenderatorTuningConfig(
-      @JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec 
appendableIndexSpec,
-      @JsonProperty("maxRowsInMemory") Integer maxRowsInMemory,
-      @JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory,
-      @JsonProperty("skipBytesInMemoryOverheadCheck") @Nullable Boolean 
skipBytesInMemoryOverheadCheck,
-      @JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment,
-      @JsonProperty("maxTotalRows") @Nullable Long maxTotalRows,
-      @JsonProperty("intermediatePersistPeriod") Period 
intermediatePersistPeriod,
-      @JsonProperty("basePersistDirectory") File basePersistDirectory,
-      @JsonProperty("maxPendingPersists") Integer maxPendingPersists,
-      @JsonProperty("shardSpec") ShardSpec shardSpec,
-      @JsonProperty("indexSpec") IndexSpec indexSpec,
-      @JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec 
indexSpecForIntermediatePersists,
-      @JsonProperty("reportParseExceptions") Boolean reportParseExceptions,
-      @JsonProperty("publishAndHandoffTimeout") Long publishAndHandoffTimeout,
-      @JsonProperty("alertTimeout") Long alertTimeout,
-      @JsonProperty("segmentWriteOutMediumFactory") @Nullable 
SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
-      @JsonProperty("logParseExceptions") @Nullable Boolean logParseExceptions,
-      @JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions,
-      @JsonProperty("maxSavedParseExceptions") @Nullable Integer 
maxSavedParseExceptions
+      @Nullable AppendableIndexSpec appendableIndexSpec,
+      Integer maxRowsInMemory,
+      @Nullable Long maxBytesInMemory,
+      @Nullable Boolean skipBytesInMemoryOverheadCheck,
+      @Nullable Integer maxRowsPerSegment,
+      @Nullable Long maxTotalRows,
+      Period intermediatePersistPeriod,
+      File basePersistDirectory,
+      Integer maxPendingPersists,
+      ShardSpec shardSpec,
+      IndexSpec indexSpec,
+      @Nullable IndexSpec indexSpecForIntermediatePersists,
+      Boolean reportParseExceptions,
+      Long publishAndHandoffTimeout,
+      Long alertTimeout,
+      @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
+      @Nullable Boolean logParseExceptions,
+      @Nullable Integer maxParseExceptions,
+      @Nullable Integer maxSavedParseExceptions
   )
   {
     this.appendableIndexSpec = appendableIndexSpec == null ? 
DEFAULT_APPENDABLE_INDEX : appendableIndexSpec;
@@ -108,7 +101,7 @@ public class RealtimeAppenderatorTuningConfig implements 
AppenderatorConfig
     this.intermediatePersistPeriod = intermediatePersistPeriod == null
                                      ? DEFAULT_INTERMEDIATE_PERSIST_PERIOD
                                      : intermediatePersistPeriod;
-    this.basePersistDirectory = basePersistDirectory == null ? 
createNewBasePersistDirectory() : basePersistDirectory;
+    this.basePersistDirectory = basePersistDirectory;
     this.maxPendingPersists = maxPendingPersists == null ? 
DEFAULT_MAX_PENDING_PERSISTS : maxPendingPersists;
     this.shardSpec = shardSpec == null ? DEFAULT_SHARD_SPEC : shardSpec;
     this.indexSpec = indexSpec == null ? DEFAULT_INDEX_SPEC : indexSpec;
@@ -142,6 +135,51 @@ public class RealtimeAppenderatorTuningConfig implements 
AppenderatorConfig
                               : logParseExceptions;
   }
 
+  @JsonCreator
+  private RealtimeAppenderatorTuningConfig(
+      @JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec 
appendableIndexSpec,
+      @JsonProperty("maxRowsInMemory") Integer maxRowsInMemory,
+      @JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory,
+      @JsonProperty("skipBytesInMemoryOverheadCheck") @Nullable Boolean 
skipBytesInMemoryOverheadCheck,
+      @JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment,
+      @JsonProperty("maxTotalRows") @Nullable Long maxTotalRows,
+      @JsonProperty("intermediatePersistPeriod") Period 
intermediatePersistPeriod,
+      @JsonProperty("maxPendingPersists") Integer maxPendingPersists,
+      @JsonProperty("shardSpec") ShardSpec shardSpec,
+      @JsonProperty("indexSpec") IndexSpec indexSpec,
+      @JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec 
indexSpecForIntermediatePersists,
+      @JsonProperty("reportParseExceptions") Boolean reportParseExceptions,
+      @JsonProperty("publishAndHandoffTimeout") Long publishAndHandoffTimeout,
+      @JsonProperty("alertTimeout") Long alertTimeout,
+      @JsonProperty("segmentWriteOutMediumFactory") @Nullable 
SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
+      @JsonProperty("logParseExceptions") @Nullable Boolean logParseExceptions,
+      @JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions,
+      @JsonProperty("maxSavedParseExceptions") @Nullable Integer 
maxSavedParseExceptions
+  )
+  {
+    this(
+        appendableIndexSpec,
+        maxRowsInMemory,
+        maxBytesInMemory,
+        skipBytesInMemoryOverheadCheck,
+        maxRowsPerSegment,
+        maxTotalRows,
+        intermediatePersistPeriod,
+        null,
+        maxPendingPersists,
+        shardSpec,
+        indexSpec,
+        indexSpecForIntermediatePersists,
+        reportParseExceptions,
+        publishAndHandoffTimeout,
+        alertTimeout,
+        segmentWriteOutMediumFactory,
+        logParseExceptions,
+        maxParseExceptions,
+        maxSavedParseExceptions
+    );
+  }
+
   @Override
   @JsonProperty
   public AppendableIndexSpec getAppendableIndexSpec()
@@ -199,10 +237,9 @@ public class RealtimeAppenderatorTuningConfig implements 
AppenderatorConfig
   }
 
   @Override
-  @JsonProperty
   public File getBasePersistDirectory()
   {
-    return basePersistDirectory;
+    return Preconditions.checkNotNull(basePersistDirectory, 
"basePersistDirectory not set");
   }
 
   @Override
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java
index 2d3445f3a8..904f2820af 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java
@@ -98,7 +98,7 @@ public abstract class SeekableStreamIndexTaskTuningConfig 
implements Appenderato
     this.intermediatePersistPeriod = intermediatePersistPeriod == null
                                      ? defaults.getIntermediatePersistPeriod()
                                      : intermediatePersistPeriod;
-    this.basePersistDirectory = defaults.getBasePersistDirectory();
+    this.basePersistDirectory = basePersistDirectory;
     this.maxPendingPersists = maxPendingPersists == null ? 0 : 
maxPendingPersists;
     this.indexSpec = indexSpec == null ? defaults.getIndexSpec() : indexSpec;
     this.indexSpecForIntermediatePersists = indexSpecForIntermediatePersists 
== null ?
@@ -193,7 +193,6 @@ public abstract class SeekableStreamIndexTaskTuningConfig 
implements Appenderato
   }
 
   @Override
-  @JsonProperty
   public File getBasePersistDirectory()
   {
     return basePersistDirectory;
diff --git 
a/server/src/main/java/org/apache/druid/segment/indexing/RealtimeTuningConfig.java
 
b/server/src/main/java/org/apache/druid/segment/indexing/RealtimeTuningConfig.java
index 6901fafd79..84061711b3 100644
--- 
a/server/src/main/java/org/apache/druid/segment/indexing/RealtimeTuningConfig.java
+++ 
b/server/src/main/java/org/apache/druid/segment/indexing/RealtimeTuningConfig.java
@@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
 import org.apache.druid.indexer.partitions.PartitionsSpec;
-import org.apache.druid.java.util.common.FileUtils;
 import org.apache.druid.segment.IndexSpec;
 import org.apache.druid.segment.incremental.AppendableIndexSpec;
 import org.apache.druid.segment.realtime.appenderator.AppenderatorConfig;
@@ -56,11 +55,6 @@ public class RealtimeTuningConfig implements 
AppenderatorConfig
   private static final long DEFAULT_ALERT_TIMEOUT = 0;
   private static final String DEFAULT_DEDUP_COLUMN = null;
 
-  private static File createNewBasePersistDirectory()
-  {
-    return FileUtils.createTempDir("druid-realtime-persist");
-  }
-
   // Might make sense for this to be a builder
   public static RealtimeTuningConfig makeDefaultTuningConfig(final @Nullable 
File basePersistDirectory)
   {
@@ -71,7 +65,7 @@ public class RealtimeTuningConfig implements 
AppenderatorConfig
         DEFAULT_SKIP_BYTES_IN_MEMORY_OVERHEAD_CHECK,
         DEFAULT_INTERMEDIATE_PERSIST_PERIOD,
         DEFAULT_WINDOW_PERIOD,
-        basePersistDirectory == null ? createNewBasePersistDirectory() : 
basePersistDirectory,
+        basePersistDirectory,
         DEFAULT_VERSIONING_POLICY,
         DEFAULT_REJECTION_POLICY_FACTORY,
         DEFAULT_MAX_PENDING_PERSISTS,
@@ -111,28 +105,27 @@ public class RealtimeTuningConfig implements 
AppenderatorConfig
   @Nullable
   private final String dedupColumn;
 
-  @JsonCreator
   public RealtimeTuningConfig(
-      @JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec 
appendableIndexSpec,
-      @JsonProperty("maxRowsInMemory") Integer maxRowsInMemory,
-      @JsonProperty("maxBytesInMemory") Long maxBytesInMemory,
-      @JsonProperty("skipBytesInMemoryOverheadCheck") @Nullable Boolean 
skipBytesInMemoryOverheadCheck,
-      @JsonProperty("intermediatePersistPeriod") Period 
intermediatePersistPeriod,
-      @JsonProperty("windowPeriod") Period windowPeriod,
-      @JsonProperty("basePersistDirectory") File basePersistDirectory,
-      @JsonProperty("versioningPolicy") VersioningPolicy versioningPolicy,
-      @JsonProperty("rejectionPolicy") RejectionPolicyFactory 
rejectionPolicyFactory,
-      @JsonProperty("maxPendingPersists") Integer maxPendingPersists,
-      @JsonProperty("shardSpec") ShardSpec shardSpec,
-      @JsonProperty("indexSpec") IndexSpec indexSpec,
-      @JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec 
indexSpecForIntermediatePersists,
-      @JsonProperty("persistThreadPriority") int persistThreadPriority,
-      @JsonProperty("mergeThreadPriority") int mergeThreadPriority,
-      @JsonProperty("reportParseExceptions") Boolean reportParseExceptions,
-      @JsonProperty("handoffConditionTimeout") Long handoffConditionTimeout,
-      @JsonProperty("alertTimeout") Long alertTimeout,
-      @JsonProperty("segmentWriteOutMediumFactory") @Nullable 
SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
-      @JsonProperty("dedupColumn") @Nullable String dedupColumn
+      @Nullable AppendableIndexSpec appendableIndexSpec,
+      Integer maxRowsInMemory,
+      Long maxBytesInMemory,
+      @Nullable Boolean skipBytesInMemoryOverheadCheck,
+      Period intermediatePersistPeriod,
+      Period windowPeriod,
+      File basePersistDirectory,
+      VersioningPolicy versioningPolicy,
+      RejectionPolicyFactory rejectionPolicyFactory,
+      Integer maxPendingPersists,
+      ShardSpec shardSpec,
+      IndexSpec indexSpec,
+      @Nullable IndexSpec indexSpecForIntermediatePersists,
+      int persistThreadPriority,
+      int mergeThreadPriority,
+      Boolean reportParseExceptions,
+      Long handoffConditionTimeout,
+      Long alertTimeout,
+      @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
+      @Nullable String dedupColumn
   )
   {
     this.appendableIndexSpec = appendableIndexSpec == null ? 
DEFAULT_APPENDABLE_INDEX : appendableIndexSpec;
@@ -146,8 +139,8 @@ public class RealtimeTuningConfig implements 
AppenderatorConfig
                                      ? DEFAULT_INTERMEDIATE_PERSIST_PERIOD
                                      : intermediatePersistPeriod;
     this.windowPeriod = windowPeriod == null ? DEFAULT_WINDOW_PERIOD : 
windowPeriod;
-    this.basePersistDirectory = basePersistDirectory == null ? 
createNewBasePersistDirectory() : basePersistDirectory;
-    this.versioningPolicy = versioningPolicy == null ? 
DEFAULT_VERSIONING_POLICY : versioningPolicy;
+    this.basePersistDirectory = basePersistDirectory;
+    this.versioningPolicy = versioningPolicy;
     this.rejectionPolicyFactory = rejectionPolicyFactory == null
                                   ? DEFAULT_REJECTION_POLICY_FACTORY
                                   : rejectionPolicyFactory;
@@ -172,6 +165,52 @@ public class RealtimeTuningConfig implements 
AppenderatorConfig
     this.dedupColumn = dedupColumn == null ? DEFAULT_DEDUP_COLUMN : 
dedupColumn;
   }
 
+  @JsonCreator
+  private RealtimeTuningConfig(
+      @JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec 
appendableIndexSpec,
+      @JsonProperty("maxRowsInMemory") Integer maxRowsInMemory,
+      @JsonProperty("maxBytesInMemory") Long maxBytesInMemory,
+      @JsonProperty("skipBytesInMemoryOverheadCheck") @Nullable Boolean 
skipBytesInMemoryOverheadCheck,
+      @JsonProperty("intermediatePersistPeriod") Period 
intermediatePersistPeriod,
+      @JsonProperty("windowPeriod") Period windowPeriod,
+      @JsonProperty("rejectionPolicy") RejectionPolicyFactory 
rejectionPolicyFactory,
+      @JsonProperty("maxPendingPersists") Integer maxPendingPersists,
+      @JsonProperty("shardSpec") ShardSpec shardSpec,
+      @JsonProperty("indexSpec") IndexSpec indexSpec,
+      @JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec 
indexSpecForIntermediatePersists,
+      @JsonProperty("persistThreadPriority") int persistThreadPriority,
+      @JsonProperty("mergeThreadPriority") int mergeThreadPriority,
+      @JsonProperty("reportParseExceptions") Boolean reportParseExceptions,
+      @JsonProperty("handoffConditionTimeout") Long handoffConditionTimeout,
+      @JsonProperty("alertTimeout") Long alertTimeout,
+      @JsonProperty("segmentWriteOutMediumFactory") @Nullable 
SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
+      @JsonProperty("dedupColumn") @Nullable String dedupColumn
+  )
+  {
+    this(
+        appendableIndexSpec,
+        maxRowsInMemory,
+        maxBytesInMemory,
+        skipBytesInMemoryOverheadCheck,
+        intermediatePersistPeriod,
+        windowPeriod,
+        null,
+        null,
+        rejectionPolicyFactory,
+        maxPendingPersists,
+        shardSpec,
+        indexSpec,
+        indexSpecForIntermediatePersists,
+        persistThreadPriority,
+        mergeThreadPriority,
+        reportParseExceptions,
+        handoffConditionTimeout,
+        alertTimeout,
+        segmentWriteOutMediumFactory,
+        dedupColumn
+    );
+  }
+
   @Override
   @JsonProperty
   public AppendableIndexSpec getAppendableIndexSpec()
@@ -214,16 +253,14 @@ public class RealtimeTuningConfig implements 
AppenderatorConfig
   }
 
   @Override
-  @JsonProperty
   public File getBasePersistDirectory()
   {
-    return basePersistDirectory;
+    return Preconditions.checkNotNull(basePersistDirectory, 
"basePersistDirectory not set");
   }
 
-  @JsonProperty
   public VersioningPolicy getVersioningPolicy()
   {
-    return versioningPolicy;
+    return Preconditions.checkNotNull(versioningPolicy, "versioningPolicy not 
set");
   }
 
   @JsonProperty("rejectionPolicy")
diff --git 
a/server/src/test/java/org/apache/druid/segment/indexing/RealtimeTuningConfigTest.java
 
b/server/src/test/java/org/apache/druid/segment/indexing/RealtimeTuningConfigTest.java
index 72f0b8bbfe..c240cc4d35 100644
--- 
a/server/src/test/java/org/apache/druid/segment/indexing/RealtimeTuningConfigTest.java
+++ 
b/server/src/test/java/org/apache/druid/segment/indexing/RealtimeTuningConfigTest.java
@@ -35,14 +35,6 @@ import java.util.UUID;
 
 public class RealtimeTuningConfigTest
 {
-  @Test
-  public void testDefaultBasePersistDirectory()
-  {
-    final RealtimeTuningConfig tuningConfig1 = 
RealtimeTuningConfig.makeDefaultTuningConfig(null);
-    final RealtimeTuningConfig tuningConfig2 = 
RealtimeTuningConfig.makeDefaultTuningConfig(null);
-    Assert.assertNotEquals(tuningConfig1.getBasePersistDirectory(), 
tuningConfig2.getBasePersistDirectory());
-  }
-
   @Test
   public void 
testErrorMessageIsMeaningfulWhenUnableToCreateTemporaryDirectory()
   {
@@ -89,7 +81,6 @@ public class RealtimeTuningConfigTest
         TuningConfig.class
     );
 
-    Assert.assertNotNull(config.getBasePersistDirectory());
     Assert.assertEquals(new OnheapIncrementalIndex.Spec(), 
config.getAppendableIndexSpec());
     Assert.assertEquals(0, config.getHandoffConditionTimeout());
     Assert.assertEquals(0, config.getAlertTimeout());
@@ -102,7 +93,7 @@ public class RealtimeTuningConfigTest
     Assert.assertEquals(0, config.getMergeThreadPriority());
     Assert.assertEquals(0, config.getPersistThreadPriority());
     Assert.assertEquals(new Period("PT10M"), config.getWindowPeriod());
-    Assert.assertEquals(false, config.isReportParseExceptions());
+    Assert.assertFalse(config.isReportParseExceptions());
   }
 
   @Test
@@ -113,7 +104,6 @@ public class RealtimeTuningConfigTest
                      + "  \"maxRowsInMemory\": 100,\n"
                      + "  \"intermediatePersistPeriod\": \"PT1H\",\n"
                      + "  \"windowPeriod\": \"PT1H\",\n"
-                     + "  \"basePersistDirectory\": \"/tmp/xxx\",\n"
                      + "  \"maxPendingPersists\": 100,\n"
                      + "  \"persistThreadPriority\": 100,\n"
                      + "  \"mergeThreadPriority\": 100,\n"
@@ -136,7 +126,6 @@ public class RealtimeTuningConfigTest
         TuningConfig.class
     );
 
-    Assert.assertEquals("/tmp/xxx", 
config.getBasePersistDirectory().toString());
     Assert.assertEquals(new OnheapIncrementalIndex.Spec(), 
config.getAppendableIndexSpec());
     Assert.assertEquals(100, config.getHandoffConditionTimeout());
     Assert.assertEquals(70, config.getAlertTimeout());
diff --git 
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java
 
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java
index f2d793e051..dc96e912dc 100644
--- 
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java
+++ 
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorPlumberTest.java
@@ -29,16 +29,27 @@ import 
org.apache.druid.segment.realtime.plumber.NoopRejectionPolicyFactory;
 import org.apache.druid.server.coordination.DataSegmentAnnouncer;
 import org.easymock.EasyMock;
 import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
 public class AppenderatorPlumberTest
 {
-  private final AppenderatorPlumber plumber;
-  private final StreamAppenderatorTester streamAppenderatorTester;
+  private AppenderatorPlumber plumber;
+  private StreamAppenderatorTester streamAppenderatorTester;
 
-  public AppenderatorPlumberTest() throws Exception
+  @Rule
+  public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+  @Before
+  public void setUp() throws Exception
   {
-    this.streamAppenderatorTester = new StreamAppenderatorTester(10);
+    this.streamAppenderatorTester =
+        new StreamAppenderatorTester.Builder()
+            .maxRowsInMemory(10)
+            .basePersistDirectory(temporaryFolder.newFolder())
+            .build();
     DataSegmentAnnouncer segmentAnnouncer = EasyMock
         .createMock(DataSegmentAnnouncer.class);
     segmentAnnouncer.announceSegment(EasyMock.anyObject());
@@ -60,7 +71,7 @@ public class AppenderatorPlumberTest
                 EasyMock.anyObject(),
                 EasyMock.anyObject(),
                 EasyMock.anyObject())).andReturn(true).anyTimes();
-    
+
     RealtimeTuningConfig tuningConfig = new RealtimeTuningConfig(
         null,
         1,
@@ -68,7 +79,7 @@ public class AppenderatorPlumberTest
         null,
         null,
         null,
-        null,
+        temporaryFolder.newFolder(),
         new IntervalStartVersioningPolicy(),
         new NoopRejectionPolicyFactory(),
         null,
@@ -88,7 +99,6 @@ public class AppenderatorPlumberTest
                                            tuningConfig, 
streamAppenderatorTester.getMetrics(),
                                            segmentAnnouncer, segmentPublisher, 
handoffNotifier,
                                            
streamAppenderatorTester.getAppenderator());
-
   }
 
   @Test
diff --git 
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java
 
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java
index b6154a6afe..476ecded7e 100644
--- 
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java
+++ 
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java
@@ -50,7 +50,9 @@ import org.joda.time.DateTime;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -100,14 +102,21 @@ public class StreamAppenderatorDriverTest extends 
EasyMockSupport
   private StreamAppenderatorDriver driver;
   private DataSegmentKiller dataSegmentKiller;
 
+  @Rule
+  public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
   static {
     NullHandling.initializeForTests();
   }
 
   @Before
-  public void setUp()
+  public void setUp() throws Exception
   {
-    streamAppenderatorTester = new 
StreamAppenderatorTester(MAX_ROWS_IN_MEMORY);
+    streamAppenderatorTester =
+        new StreamAppenderatorTester.Builder()
+            .maxRowsInMemory(MAX_ROWS_IN_MEMORY)
+            .basePersistDirectory(temporaryFolder.newFolder())
+            .build();
     allocator = new TestSegmentAllocator(DATA_SOURCE, Granularities.HOUR);
     segmentHandoffNotifierFactory = new TestSegmentHandoffNotifierFactory();
     dataSegmentKiller = createStrictMock(DataSegmentKiller.class);
diff --git 
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java
 
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java
index bc123b1896..2e05cb9053 100644
--- 
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java
+++ 
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTest.java
@@ -51,7 +51,9 @@ import org.apache.druid.testing.InitializedNullHandlingTest;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.partition.LinearShardSpec;
 import org.junit.Assert;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
 import java.util.Arrays;
 import java.util.Collections;
@@ -69,10 +71,17 @@ public class StreamAppenderatorTest extends 
InitializedNullHandlingTest
       si("2001/2002", "A", 0)
   );
 
+  @Rule
+  public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
   @Test
   public void testSimpleIngestion() throws Exception
   {
-    try (final StreamAppenderatorTester tester = new 
StreamAppenderatorTester(2, true)) {
+    try (final StreamAppenderatorTester tester =
+             new StreamAppenderatorTester.Builder().maxRowsInMemory(2)
+                                                   .enablePushFailure(true)
+                                                   
.basePersistDirectory(temporaryFolder.newFolder())
+                                                   .build()) {
       final Appenderator appenderator = tester.getAppenderator();
       boolean thrown;
 
@@ -128,7 +137,10 @@ public class StreamAppenderatorTest extends 
InitializedNullHandlingTest
           committerSupplier.get(),
           false
       ).get();
-      Assert.assertEquals(ImmutableMap.of("x", "3"), (Map<String, String>) 
segmentsAndCommitMetadata.getCommitMetadata());
+      Assert.assertEquals(
+          ImmutableMap.of("x", "3"),
+          (Map<String, String>) segmentsAndCommitMetadata.getCommitMetadata()
+      );
       Assert.assertEquals(
           IDENTIFIERS.subList(0, 2),
           sorted(
@@ -157,15 +169,13 @@ public class StreamAppenderatorTest extends 
InitializedNullHandlingTest
   public void testMaxBytesInMemoryWithSkipBytesInMemoryOverheadCheckConfig() 
throws Exception
   {
     try (
-        final StreamAppenderatorTester tester = new StreamAppenderatorTester(
-            100,
-            1024,
-            null,
-            true,
-            new SimpleRowIngestionMeters(),
-            true
-        )
-    ) {
+        final StreamAppenderatorTester tester =
+            new StreamAppenderatorTester.Builder().maxRowsInMemory(100)
+                                                  .maxSizeInBytes(1024)
+                                                  
.basePersistDirectory(temporaryFolder.newFolder())
+                                                  .enablePushFailure(true)
+                                                  
.skipBytesInMemoryOverheadCheck(true)
+                                                  .build()) {
       final Appenderator appenderator = tester.getAppenderator();
       final AtomicInteger eventCount = new AtomicInteger(0);
       final Supplier<Committer> committerSupplier = () -> {
@@ -209,15 +219,13 @@ public class StreamAppenderatorTest extends 
InitializedNullHandlingTest
   public void 
testMaxBytesInMemoryInMultipleSinksWithSkipBytesInMemoryOverheadCheckConfig() 
throws Exception
   {
     try (
-        final StreamAppenderatorTester tester = new StreamAppenderatorTester(
-            100,
-            1024,
-            null,
-            true,
-            new SimpleRowIngestionMeters(),
-            true
-        )
-    ) {
+        final StreamAppenderatorTester tester =
+            new StreamAppenderatorTester.Builder().maxRowsInMemory(100)
+                                                  .maxSizeInBytes(1024)
+                                                  
.basePersistDirectory(temporaryFolder.newFolder())
+                                                  .enablePushFailure(true)
+                                                  
.skipBytesInMemoryOverheadCheck(true)
+                                                  .build()) {
       final Appenderator appenderator = tester.getAppenderator();
       final AtomicInteger eventCount = new AtomicInteger(0);
       final Supplier<Committer> committerSupplier = () -> {
@@ -257,7 +265,13 @@ public class StreamAppenderatorTest extends 
InitializedNullHandlingTest
   @Test
   public void testMaxBytesInMemory() throws Exception
   {
-    try (final StreamAppenderatorTester tester = new 
StreamAppenderatorTester(100, 15000, true)) {
+    try (
+        final StreamAppenderatorTester tester =
+            new StreamAppenderatorTester.Builder().maxRowsInMemory(100)
+                                                  .maxSizeInBytes(15000)
+                                                  
.basePersistDirectory(temporaryFolder.newFolder())
+                                                  .enablePushFailure(true)
+                                                  .build()) {
       final Appenderator appenderator = tester.getAppenderator();
       final AtomicInteger eventCount = new AtomicInteger(0);
       final Supplier<Committer> committerSupplier = () -> {
@@ -363,7 +377,13 @@ public class StreamAppenderatorTest extends 
InitializedNullHandlingTest
   @Test(expected = RuntimeException.class)
   public void testTaskFailAsPersistCannotFreeAnyMoreMemory() throws Exception
   {
-    try (final StreamAppenderatorTester tester = new 
StreamAppenderatorTester(100, 5180, true)) {
+    try (
+        final StreamAppenderatorTester tester =
+            new StreamAppenderatorTester.Builder().maxRowsInMemory(100)
+                                                  .maxSizeInBytes(5180)
+                                                  
.basePersistDirectory(temporaryFolder.newFolder())
+                                                  .enablePushFailure(true)
+                                                  .build()) {
       final Appenderator appenderator = tester.getAppenderator();
       final AtomicInteger eventCount = new AtomicInteger(0);
       final Supplier<Committer> committerSupplier = () -> {
@@ -394,15 +414,13 @@ public class StreamAppenderatorTest extends 
InitializedNullHandlingTest
   public void 
testTaskDoesNotFailAsExceededMemoryWithSkipBytesInMemoryOverheadCheckConfig() 
throws Exception
   {
     try (
-        final StreamAppenderatorTester tester = new StreamAppenderatorTester(
-            100,
-            10,
-            null,
-            true,
-            new SimpleRowIngestionMeters(),
-            true
-        )
-    ) {
+        final StreamAppenderatorTester tester =
+            new StreamAppenderatorTester.Builder().maxRowsInMemory(100)
+                                                  .maxSizeInBytes(10)
+                                                  
.basePersistDirectory(temporaryFolder.newFolder())
+                                                  .enablePushFailure(true)
+                                                  
.skipBytesInMemoryOverheadCheck(true)
+                                                  .build()) {
       final Appenderator appenderator = tester.getAppenderator();
       final AtomicInteger eventCount = new AtomicInteger(0);
       final Supplier<Committer> committerSupplier = () -> {
@@ -443,7 +461,13 @@ public class StreamAppenderatorTest extends 
InitializedNullHandlingTest
   @Test
   public void testTaskCleanupInMemoryCounterAfterCloseWithRowInMemory() throws 
Exception
   {
-    try (final StreamAppenderatorTester tester = new 
StreamAppenderatorTester(100, 10000, true)) {
+    try (
+        final StreamAppenderatorTester tester =
+            new StreamAppenderatorTester.Builder().maxRowsInMemory(100)
+                                                  .maxSizeInBytes(10000)
+                                                  
.basePersistDirectory(temporaryFolder.newFolder())
+                                                  .enablePushFailure(true)
+                                                  .build()) {
       final Appenderator appenderator = tester.getAppenderator();
       final AtomicInteger eventCount = new AtomicInteger(0);
       final Supplier<Committer> committerSupplier = () -> {
@@ -488,7 +512,13 @@ public class StreamAppenderatorTest extends 
InitializedNullHandlingTest
   @Test
   public void testMaxBytesInMemoryInMultipleSinks() throws Exception
   {
-    try (final StreamAppenderatorTester tester = new 
StreamAppenderatorTester(100, 31100, true)) {
+    try (
+        final StreamAppenderatorTester tester =
+            new StreamAppenderatorTester.Builder().maxRowsInMemory(100)
+                                                  .maxSizeInBytes(31100)
+                                                  
.basePersistDirectory(temporaryFolder.newFolder())
+                                                  .enablePushFailure(true)
+                                                  .build()) {
       final Appenderator appenderator = tester.getAppenderator();
       final AtomicInteger eventCount = new AtomicInteger(0);
       final Supplier<Committer> committerSupplier = () -> {
@@ -628,7 +658,13 @@ public class StreamAppenderatorTest extends 
InitializedNullHandlingTest
   @Test
   public void testIgnoreMaxBytesInMemory() throws Exception
   {
-    try (final StreamAppenderatorTester tester = new 
StreamAppenderatorTester(100, -1, true)) {
+    try (
+        final StreamAppenderatorTester tester =
+            new StreamAppenderatorTester.Builder().maxRowsInMemory(100)
+                                                  .maxSizeInBytes(-1)
+                                                  
.basePersistDirectory(temporaryFolder.newFolder())
+                                                  .enablePushFailure(true)
+                                                  .build()) {
       final Appenderator appenderator = tester.getAppenderator();
       final AtomicInteger eventCount = new AtomicInteger(0);
       final Supplier<Committer> committerSupplier = () -> {
@@ -676,7 +712,12 @@ public class StreamAppenderatorTest extends 
InitializedNullHandlingTest
   @Test
   public void testMaxRowsInMemory() throws Exception
   {
-    try (final StreamAppenderatorTester tester = new 
StreamAppenderatorTester(3, true)) {
+    try (
+        final StreamAppenderatorTester tester =
+            new StreamAppenderatorTester.Builder().maxRowsInMemory(3)
+                                                  
.basePersistDirectory(temporaryFolder.newFolder())
+                                                  .enablePushFailure(true)
+                                                  .build()) {
       final Appenderator appenderator = tester.getAppenderator();
       final AtomicInteger eventCount = new AtomicInteger(0);
       final Supplier<Committer> committerSupplier = new Supplier<Committer>()
@@ -727,7 +768,11 @@ public class StreamAppenderatorTest extends 
InitializedNullHandlingTest
   @Test
   public void testMaxRowsInMemoryDisallowIncrementalPersists() throws Exception
   {
-    try (final StreamAppenderatorTester tester = new 
StreamAppenderatorTester(3, false)) {
+    try (
+        final StreamAppenderatorTester tester =
+            new StreamAppenderatorTester.Builder().maxRowsInMemory(3)
+                                                  
.basePersistDirectory(temporaryFolder.newFolder())
+                                                  .build()) {
       final Appenderator appenderator = tester.getAppenderator();
       final AtomicInteger eventCount = new AtomicInteger(0);
       final Supplier<Committer> committerSupplier = () -> {
@@ -774,7 +819,12 @@ public class StreamAppenderatorTest extends 
InitializedNullHandlingTest
   public void testRestoreFromDisk() throws Exception
   {
     final RealtimeTuningConfig tuningConfig;
-    try (final StreamAppenderatorTester tester = new 
StreamAppenderatorTester(2, true)) {
+    try (
+        final StreamAppenderatorTester tester =
+            new StreamAppenderatorTester.Builder().maxRowsInMemory(2)
+                                                  
.basePersistDirectory(temporaryFolder.newFolder())
+                                                  .enablePushFailure(true)
+                                                  .build()) {
       final Appenderator appenderator = tester.getAppenderator();
       tuningConfig = tester.getTuningConfig();
 
@@ -816,12 +866,12 @@ public class StreamAppenderatorTest extends 
InitializedNullHandlingTest
       appenderator.add(IDENTIFIERS.get(0), ir("2000", "bob", 5), 
committerSupplier);
       appenderator.close();
 
-      try (final StreamAppenderatorTester tester2 = new 
StreamAppenderatorTester(
-          2,
-          -1,
-          tuningConfig.getBasePersistDirectory(),
-          true
-      )) {
+      try (
+          final StreamAppenderatorTester tester2 =
+              new StreamAppenderatorTester.Builder().maxRowsInMemory(2)
+                                                    
.basePersistDirectory(tuningConfig.getBasePersistDirectory())
+                                                    .enablePushFailure(true)
+                                                    .build()) {
         final Appenderator appenderator2 = tester2.getAppenderator();
         Assert.assertEquals(ImmutableMap.of("eventCount", 4), 
appenderator2.startJob());
         Assert.assertEquals(ImmutableList.of(IDENTIFIERS.get(0)), 
appenderator2.getSegments());
@@ -833,7 +883,12 @@ public class StreamAppenderatorTest extends 
InitializedNullHandlingTest
   @Test(timeout = 60_000L)
   public void testTotalRowCount() throws Exception
   {
-    try (final StreamAppenderatorTester tester = new 
StreamAppenderatorTester(3, true)) {
+    try (
+        final StreamAppenderatorTester tester =
+            new StreamAppenderatorTester.Builder().maxRowsInMemory(3)
+                                                  
.basePersistDirectory(temporaryFolder.newFolder())
+                                                  .enablePushFailure(true)
+                                                  .build()) {
       final Appenderator appenderator = tester.getAppenderator();
       final ConcurrentMap<String, String> commitMetadata = new 
ConcurrentHashMap<>();
       final Supplier<Committer> committerSupplier = 
committerSupplierFromConcurrentMap(commitMetadata);
@@ -876,7 +931,13 @@ public class StreamAppenderatorTest extends 
InitializedNullHandlingTest
   public void testVerifyRowIngestionMetrics() throws Exception
   {
     final RowIngestionMeters rowIngestionMeters = new 
SimpleRowIngestionMeters();
-    try (final StreamAppenderatorTester tester = new 
StreamAppenderatorTester(5, 10000L, null, false, rowIngestionMeters)) {
+    try (
+        final StreamAppenderatorTester tester =
+            new StreamAppenderatorTester.Builder().maxRowsInMemory(5)
+                                                  .maxSizeInBytes(10000L)
+                                                  
.basePersistDirectory(temporaryFolder.newFolder())
+                                                  
.rowIngestionMeters(rowIngestionMeters)
+                                                  .build()) {
       final Appenderator appenderator = tester.getAppenderator();
       appenderator.startJob();
       appenderator.add(IDENTIFIERS.get(0), ir("2000", "foo", "invalid_met"), 
Committers.nilSupplier());
@@ -892,7 +953,12 @@ public class StreamAppenderatorTest extends 
InitializedNullHandlingTest
   @Test
   public void testQueryByIntervals() throws Exception
   {
-    try (final StreamAppenderatorTester tester = new 
StreamAppenderatorTester(2, true)) {
+    try (
+        final StreamAppenderatorTester tester =
+            new StreamAppenderatorTester.Builder().maxRowsInMemory(2)
+                                                  
.basePersistDirectory(temporaryFolder.newFolder())
+                                                  .enablePushFailure(true)
+                                                  .build()) {
       final Appenderator appenderator = tester.getAppenderator();
 
       appenderator.startJob();
@@ -1028,7 +1094,12 @@ public class StreamAppenderatorTest extends 
InitializedNullHandlingTest
   @Test
   public void testQueryBySegments() throws Exception
   {
-    try (final StreamAppenderatorTester tester = new 
StreamAppenderatorTester(2, true)) {
+    try (
+        final StreamAppenderatorTester tester =
+            new StreamAppenderatorTester.Builder().maxRowsInMemory(2)
+                                                  
.basePersistDirectory(temporaryFolder.newFolder())
+                                                  .enablePushFailure(true)
+                                                  .build()) {
       final Appenderator appenderator = tester.getAppenderator();
 
       appenderator.startJob();
diff --git 
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTester.java
 
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTester.java
index 808d217f4e..413f315f50 100644
--- 
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTester.java
+++ 
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorTester.java
@@ -20,6 +20,7 @@
 package org.apache.druid.segment.realtime.appenderator;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 import org.apache.druid.client.cache.CacheConfig;
 import org.apache.druid.client.cache.CachePopulatorStats;
@@ -87,57 +88,10 @@ public class StreamAppenderatorTester implements 
AutoCloseable
   private final ObjectMapper objectMapper;
   private final Appenderator appenderator;
   private final ExecutorService queryExecutor;
-  private final IndexIO indexIO;
-  private final IndexMergerV9 indexMerger;
   private final ServiceEmitter emitter;
 
   private final List<DataSegment> pushedSegments = new 
CopyOnWriteArrayList<>();
 
-  public StreamAppenderatorTester(
-      final int maxRowsInMemory
-  )
-  {
-    this(maxRowsInMemory, -1, null, false);
-  }
-
-  public StreamAppenderatorTester(
-      final int maxRowsInMemory,
-      final boolean enablePushFailure
-  )
-  {
-    this(maxRowsInMemory, -1, null, enablePushFailure);
-  }
-
-  public StreamAppenderatorTester(
-      final int maxRowsInMemory,
-      final long maxSizeInBytes,
-      final boolean enablePushFailure
-  )
-  {
-    this(maxRowsInMemory, maxSizeInBytes, null, enablePushFailure);
-  }
-
-  public StreamAppenderatorTester(
-      final int maxRowsInMemory,
-      final long maxSizeInBytes,
-      final File basePersistDirectory,
-      final boolean enablePushFailure
-  )
-  {
-    this(maxRowsInMemory, maxSizeInBytes, basePersistDirectory, 
enablePushFailure, new SimpleRowIngestionMeters(), false);
-  }
-
-  public StreamAppenderatorTester(
-      final int maxRowsInMemory,
-      final long maxSizeInBytes,
-      final File basePersistDirectory,
-      final boolean enablePushFailure,
-      final RowIngestionMeters rowIngestionMeters
-  )
-  {
-    this(maxRowsInMemory, maxSizeInBytes, basePersistDirectory, 
enablePushFailure, rowIngestionMeters, false);
-  }
-
   public StreamAppenderatorTester(
       final int maxRowsInMemory,
       final long maxSizeInBytes,
@@ -199,7 +153,7 @@ public class StreamAppenderatorTester implements 
AutoCloseable
     metrics = new FireDepartmentMetrics();
     queryExecutor = Execs.singleThreaded("queryExecutor(%d)");
 
-    indexIO = new IndexIO(
+    IndexIO indexIO = new IndexIO(
         objectMapper,
         new ColumnConfig()
         {
@@ -210,7 +164,12 @@ public class StreamAppenderatorTester implements 
AutoCloseable
           }
         }
     );
-    indexMerger = new IndexMergerV9(objectMapper, indexIO, 
OffHeapMemorySegmentWriteOutMediumFactory.instance());
+
+    IndexMergerV9 indexMerger = new IndexMergerV9(
+        objectMapper,
+        indexIO,
+        OffHeapMemorySegmentWriteOutMediumFactory.instance()
+    );
 
     emitter = new ServiceEmitter(
         "test",
@@ -342,4 +301,62 @@ public class StreamAppenderatorTester implements 
AutoCloseable
     emitter.close();
     FileUtils.deleteDirectory(tuningConfig.getBasePersistDirectory());
   }
+
+  public static class Builder
+  {
+    private int maxRowsInMemory;
+    private long maxSizeInBytes = -1;
+    private File basePersistDirectory;
+    private boolean enablePushFailure;
+    private RowIngestionMeters rowIngestionMeters;
+    private boolean skipBytesInMemoryOverheadCheck;
+
+    public Builder maxRowsInMemory(final int maxRowsInMemory)
+    {
+      this.maxRowsInMemory = maxRowsInMemory;
+      return this;
+    }
+
+    public Builder maxSizeInBytes(final long maxSizeInBytes)
+    {
+      this.maxSizeInBytes = maxSizeInBytes;
+      return this;
+    }
+
+    public Builder basePersistDirectory(final File basePersistDirectory)
+    {
+      this.basePersistDirectory = basePersistDirectory;
+      return this;
+    }
+
+    public Builder enablePushFailure(final boolean enablePushFailure)
+    {
+      this.enablePushFailure = enablePushFailure;
+      return this;
+    }
+
+    public Builder rowIngestionMeters(final RowIngestionMeters 
rowIngestionMeters)
+    {
+      this.rowIngestionMeters = rowIngestionMeters;
+      return this;
+    }
+
+    public Builder skipBytesInMemoryOverheadCheck(final boolean 
skipBytesInMemoryOverheadCheck)
+    {
+      this.skipBytesInMemoryOverheadCheck = skipBytesInMemoryOverheadCheck;
+      return this;
+    }
+
+    public StreamAppenderatorTester build()
+    {
+      return new StreamAppenderatorTester(
+          maxRowsInMemory,
+          maxSizeInBytes,
+          Preconditions.checkNotNull(basePersistDirectory, 
"basePersistDirectory"),
+          enablePushFailure,
+          rowIngestionMeters == null ? new SimpleRowIngestionMeters() : 
rowIngestionMeters,
+          skipBytesInMemoryOverheadCheck
+      );
+    }
+  }
 }
diff --git 
a/server/src/test/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java
 
b/server/src/test/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java
index dcf697a821..eeea2a930e 100644
--- 
a/server/src/test/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java
+++ 
b/server/src/test/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java
@@ -68,7 +68,9 @@ import org.joda.time.Interval;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
@@ -119,6 +121,9 @@ public class RealtimePlumberSchoolTest extends 
InitializedNullHandlingTest
   private FireDepartmentMetrics metrics;
   private File tmpDir;
 
+  @Rule
+  public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
   public RealtimePlumberSchoolTest(
       RejectionPolicyFactory rejectionPolicy,
       SegmentWriteOutMediumFactory segmentWriteOutMediumFactory
@@ -207,7 +212,7 @@ public class RealtimePlumberSchoolTest extends 
InitializedNullHandlingTest
         null,
         null,
         null,
-        null,
+        temporaryFolder.newFolder(),
         new IntervalStartVersioningPolicy(),
         rejectionPolicy,
         null,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to