fjy closed pull request #6393: Remove ConvertSegmentTask, HadoopConverterTask, 
and ConvertSegmentBackwardsCompatibleTask
URL: https://github.com/apache/incubator-druid/pull/6393
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/benchmarks/src/main/java/org/apache/druid/benchmark/FilterPartitionBenchmark.java
 
b/benchmarks/src/main/java/org/apache/druid/benchmark/FilterPartitionBenchmark.java
index 57b782ae347..6a01efd413e 100644
--- 
a/benchmarks/src/main/java/org/apache/druid/benchmark/FilterPartitionBenchmark.java
+++ 
b/benchmarks/src/main/java/org/apache/druid/benchmark/FilterPartitionBenchmark.java
@@ -23,6 +23,7 @@
 import com.google.common.base.Predicate;
 import com.google.common.collect.ImmutableList;
 import com.google.common.io.Files;
+import org.apache.commons.io.FileUtils;
 import org.apache.druid.benchmark.datagen.BenchmarkDataGenerator;
 import org.apache.druid.benchmark.datagen.BenchmarkSchemaInfo;
 import org.apache.druid.benchmark.datagen.BenchmarkSchemas;
@@ -73,7 +74,6 @@
 import org.apache.druid.segment.incremental.IncrementalIndex;
 import org.apache.druid.segment.serde.ComplexMetrics;
 import 
org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
-import org.apache.commons.io.FileUtils;
 import org.joda.time.Interval;
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.BenchmarkMode;
@@ -132,7 +132,6 @@
     JSON_MAPPER = new DefaultObjectMapper();
     INDEX_IO = new IndexIO(
         JSON_MAPPER,
-        OffHeapMemorySegmentWriteOutMediumFactory.instance(),
         new ColumnConfig()
         {
           @Override
diff --git 
a/benchmarks/src/main/java/org/apache/druid/benchmark/FilteredAggregatorBenchmark.java
 
b/benchmarks/src/main/java/org/apache/druid/benchmark/FilteredAggregatorBenchmark.java
index adb19debb90..172bce90034 100644
--- 
a/benchmarks/src/main/java/org/apache/druid/benchmark/FilteredAggregatorBenchmark.java
+++ 
b/benchmarks/src/main/java/org/apache/druid/benchmark/FilteredAggregatorBenchmark.java
@@ -22,6 +22,7 @@
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.Maps;
 import com.google.common.io.Files;
+import org.apache.commons.io.FileUtils;
 import org.apache.druid.benchmark.datagen.BenchmarkDataGenerator;
 import org.apache.druid.benchmark.datagen.BenchmarkSchemaInfo;
 import org.apache.druid.benchmark.datagen.BenchmarkSchemas;
@@ -71,7 +72,6 @@
 import org.apache.druid.segment.incremental.IncrementalIndex;
 import org.apache.druid.segment.serde.ComplexMetrics;
 import 
org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
-import org.apache.commons.io.FileUtils;
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.BenchmarkMode;
 import org.openjdk.jmh.annotations.Fork;
@@ -127,7 +127,6 @@
     JSON_MAPPER = new DefaultObjectMapper();
     INDEX_IO = new IndexIO(
         JSON_MAPPER,
-        OffHeapMemorySegmentWriteOutMediumFactory.instance(),
         new ColumnConfig()
         {
           @Override
diff --git 
a/benchmarks/src/main/java/org/apache/druid/benchmark/GroupByTypeInterfaceBenchmark.java
 
b/benchmarks/src/main/java/org/apache/druid/benchmark/GroupByTypeInterfaceBenchmark.java
index 5b9901ef933..419cfe42faf 100644
--- 
a/benchmarks/src/main/java/org/apache/druid/benchmark/GroupByTypeInterfaceBenchmark.java
+++ 
b/benchmarks/src/main/java/org/apache/druid/benchmark/GroupByTypeInterfaceBenchmark.java
@@ -26,6 +26,7 @@
 import com.google.common.base.Throwables;
 import com.google.common.collect.Maps;
 import com.google.common.io.Files;
+import org.apache.commons.io.FileUtils;
 import org.apache.druid.benchmark.datagen.BenchmarkDataGenerator;
 import org.apache.druid.benchmark.datagen.BenchmarkSchemaInfo;
 import org.apache.druid.benchmark.datagen.BenchmarkSchemas;
@@ -73,7 +74,6 @@
 import org.apache.druid.segment.incremental.IncrementalIndex;
 import org.apache.druid.segment.serde.ComplexMetrics;
 import 
org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
-import org.apache.commons.io.FileUtils;
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.BenchmarkMode;
 import org.openjdk.jmh.annotations.Fork;
@@ -148,7 +148,6 @@
     JSON_MAPPER = new DefaultObjectMapper();
     INDEX_IO = new IndexIO(
         JSON_MAPPER,
-        OffHeapMemorySegmentWriteOutMediumFactory.instance(),
         new ColumnConfig()
         {
           @Override
diff --git 
a/benchmarks/src/main/java/org/apache/druid/benchmark/TopNTypeInterfaceBenchmark.java
 
b/benchmarks/src/main/java/org/apache/druid/benchmark/TopNTypeInterfaceBenchmark.java
index d5d97a46eb3..e580ad455f1 100644
--- 
a/benchmarks/src/main/java/org/apache/druid/benchmark/TopNTypeInterfaceBenchmark.java
+++ 
b/benchmarks/src/main/java/org/apache/druid/benchmark/TopNTypeInterfaceBenchmark.java
@@ -128,7 +128,6 @@
     JSON_MAPPER = new DefaultObjectMapper();
     INDEX_IO = new IndexIO(
         JSON_MAPPER,
-        OffHeapMemorySegmentWriteOutMediumFactory.instance(),
         new ColumnConfig()
         {
           @Override
diff --git 
a/benchmarks/src/main/java/org/apache/druid/benchmark/datagen/SegmentGenerator.java
 
b/benchmarks/src/main/java/org/apache/druid/benchmark/datagen/SegmentGenerator.java
index dd339072584..1fb709ec44c 100644
--- 
a/benchmarks/src/main/java/org/apache/druid/benchmark/datagen/SegmentGenerator.java
+++ 
b/benchmarks/src/main/java/org/apache/druid/benchmark/datagen/SegmentGenerator.java
@@ -23,6 +23,7 @@
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.io.Files;
+import org.apache.commons.io.FileUtils;
 import org.apache.druid.data.input.InputRow;
 import org.apache.druid.data.input.impl.DimensionSchema;
 import org.apache.druid.data.input.impl.DimensionsSpec;
@@ -33,7 +34,6 @@
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.granularity.Granularity;
 import org.apache.druid.java.util.common.logger.Logger;
-import 
org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
 import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesSerde;
 import org.apache.druid.segment.IndexBuilder;
@@ -43,8 +43,8 @@
 import org.apache.druid.segment.TestHelper;
 import org.apache.druid.segment.incremental.IncrementalIndexSchema;
 import org.apache.druid.segment.serde.ComplexMetrics;
+import 
org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
 import org.apache.druid.timeline.DataSegment;
-import org.apache.commons.io.FileUtils;
 
 import java.io.Closeable;
 import java.io.File;
@@ -145,7 +145,7 @@ public QueryableIndex generate(
       return Iterables.getOnlyElement(indexes);
     } else {
       try {
-        final QueryableIndex merged = 
TestHelper.getTestIndexIO(OffHeapMemorySegmentWriteOutMediumFactory.instance()).loadIndex(
+        final QueryableIndex merged = TestHelper.getTestIndexIO().loadIndex(
             
TestHelper.getTestIndexMergerV9(OffHeapMemorySegmentWriteOutMediumFactory.instance()).merge(
                 
indexes.stream().map(QueryableIndexIndexableAdapter::new).collect(Collectors.toList()),
                 false,
diff --git 
a/benchmarks/src/main/java/org/apache/druid/benchmark/indexing/IndexMergeBenchmark.java
 
b/benchmarks/src/main/java/org/apache/druid/benchmark/indexing/IndexMergeBenchmark.java
index 3120daf841a..f994be6ba60 100644
--- 
a/benchmarks/src/main/java/org/apache/druid/benchmark/indexing/IndexMergeBenchmark.java
+++ 
b/benchmarks/src/main/java/org/apache/druid/benchmark/indexing/IndexMergeBenchmark.java
@@ -22,6 +22,7 @@
 import com.fasterxml.jackson.databind.InjectableValues;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.io.Files;
+import org.apache.commons.io.FileUtils;
 import org.apache.druid.benchmark.datagen.BenchmarkDataGenerator;
 import org.apache.druid.benchmark.datagen.BenchmarkSchemaInfo;
 import org.apache.druid.benchmark.datagen.BenchmarkSchemas;
@@ -40,7 +41,6 @@
 import org.apache.druid.segment.incremental.IncrementalIndexSchema;
 import org.apache.druid.segment.serde.ComplexMetrics;
 import 
org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
-import org.apache.commons.io.FileUtils;
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.BenchmarkMode;
 import org.openjdk.jmh.annotations.Fork;
@@ -96,7 +96,6 @@
     JSON_MAPPER.setInjectableValues(injectableValues);
     INDEX_IO = new IndexIO(
         JSON_MAPPER,
-        OffHeapMemorySegmentWriteOutMediumFactory.instance(),
         new ColumnConfig()
         {
           @Override
diff --git 
a/benchmarks/src/main/java/org/apache/druid/benchmark/indexing/IndexPersistBenchmark.java
 
b/benchmarks/src/main/java/org/apache/druid/benchmark/indexing/IndexPersistBenchmark.java
index 5f43ef8a753..75ae9b0192c 100644
--- 
a/benchmarks/src/main/java/org/apache/druid/benchmark/indexing/IndexPersistBenchmark.java
+++ 
b/benchmarks/src/main/java/org/apache/druid/benchmark/indexing/IndexPersistBenchmark.java
@@ -21,6 +21,7 @@
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.io.Files;
+import org.apache.commons.io.FileUtils;
 import org.apache.druid.benchmark.datagen.BenchmarkDataGenerator;
 import org.apache.druid.benchmark.datagen.BenchmarkSchemaInfo;
 import org.apache.druid.benchmark.datagen.BenchmarkSchemas;
@@ -36,7 +37,6 @@
 import org.apache.druid.segment.incremental.IncrementalIndexSchema;
 import org.apache.druid.segment.serde.ComplexMetrics;
 import 
org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
-import org.apache.commons.io.FileUtils;
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.BenchmarkMode;
 import org.openjdk.jmh.annotations.Fork;
@@ -73,7 +73,6 @@
     JSON_MAPPER = new DefaultObjectMapper();
     INDEX_IO = new IndexIO(
         JSON_MAPPER,
-        OffHeapMemorySegmentWriteOutMediumFactory.instance(),
         () -> 0
     );
     INDEX_MERGER_V9 = new IndexMergerV9(JSON_MAPPER, INDEX_IO, 
OffHeapMemorySegmentWriteOutMediumFactory.instance());
diff --git 
a/benchmarks/src/main/java/org/apache/druid/benchmark/query/GroupByBenchmark.java
 
b/benchmarks/src/main/java/org/apache/druid/benchmark/query/GroupByBenchmark.java
index 84237ee6013..10bee8d8e6f 100644
--- 
a/benchmarks/src/main/java/org/apache/druid/benchmark/query/GroupByBenchmark.java
+++ 
b/benchmarks/src/main/java/org/apache/druid/benchmark/query/GroupByBenchmark.java
@@ -29,6 +29,7 @@
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.io.Files;
+import org.apache.commons.io.FileUtils;
 import org.apache.druid.benchmark.datagen.BenchmarkDataGenerator;
 import org.apache.druid.benchmark.datagen.BenchmarkSchemaInfo;
 import org.apache.druid.benchmark.datagen.BenchmarkSchemas;
@@ -82,7 +83,6 @@
 import org.apache.druid.segment.incremental.IncrementalIndexSchema;
 import org.apache.druid.segment.serde.ComplexMetrics;
 import 
org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
-import org.apache.commons.io.FileUtils;
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.BenchmarkMode;
 import org.openjdk.jmh.annotations.Fork;
@@ -157,7 +157,6 @@
     JSON_MAPPER = new DefaultObjectMapper();
     INDEX_IO = new IndexIO(
         JSON_MAPPER,
-        OffHeapMemorySegmentWriteOutMediumFactory.instance(),
         new ColumnConfig()
         {
           @Override
diff --git 
a/benchmarks/src/main/java/org/apache/druid/benchmark/query/SearchBenchmark.java
 
b/benchmarks/src/main/java/org/apache/druid/benchmark/query/SearchBenchmark.java
index 5e3c6bed209..c945c964ac9 100644
--- 
a/benchmarks/src/main/java/org/apache/druid/benchmark/query/SearchBenchmark.java
+++ 
b/benchmarks/src/main/java/org/apache/druid/benchmark/query/SearchBenchmark.java
@@ -26,6 +26,7 @@
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.io.Files;
+import org.apache.commons.io.FileUtils;
 import org.apache.druid.benchmark.datagen.BenchmarkDataGenerator;
 import org.apache.druid.benchmark.datagen.BenchmarkSchemaInfo;
 import org.apache.druid.benchmark.datagen.BenchmarkSchemas;
@@ -77,7 +78,6 @@
 import org.apache.druid.segment.incremental.IncrementalIndex;
 import org.apache.druid.segment.serde.ComplexMetrics;
 import 
org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
-import org.apache.commons.io.FileUtils;
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.BenchmarkMode;
 import org.openjdk.jmh.annotations.Fork;
@@ -140,7 +140,6 @@
     JSON_MAPPER = new DefaultObjectMapper();
     INDEX_IO = new IndexIO(
         JSON_MAPPER,
-        OffHeapMemorySegmentWriteOutMediumFactory.instance(),
         new ColumnConfig()
         {
           @Override
diff --git 
a/benchmarks/src/main/java/org/apache/druid/benchmark/query/SelectBenchmark.java
 
b/benchmarks/src/main/java/org/apache/druid/benchmark/query/SelectBenchmark.java
index abee67d5a65..766b8c18f0f 100644
--- 
a/benchmarks/src/main/java/org/apache/druid/benchmark/query/SelectBenchmark.java
+++ 
b/benchmarks/src/main/java/org/apache/druid/benchmark/query/SelectBenchmark.java
@@ -25,6 +25,7 @@
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.io.Files;
+import org.apache.commons.io.FileUtils;
 import org.apache.druid.benchmark.datagen.BenchmarkDataGenerator;
 import org.apache.druid.benchmark.datagen.BenchmarkSchemaInfo;
 import org.apache.druid.benchmark.datagen.BenchmarkSchemas;
@@ -67,7 +68,6 @@
 import org.apache.druid.segment.incremental.IncrementalIndex;
 import org.apache.druid.segment.serde.ComplexMetrics;
 import 
org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
-import org.apache.commons.io.FileUtils;
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.BenchmarkMode;
 import org.openjdk.jmh.annotations.Fork;
@@ -133,7 +133,6 @@
     JSON_MAPPER = new DefaultObjectMapper();
     INDEX_IO = new IndexIO(
         JSON_MAPPER,
-        OffHeapMemorySegmentWriteOutMediumFactory.instance(),
         new ColumnConfig()
         {
           @Override
diff --git 
a/benchmarks/src/main/java/org/apache/druid/benchmark/query/TimeseriesBenchmark.java
 
b/benchmarks/src/main/java/org/apache/druid/benchmark/query/TimeseriesBenchmark.java
index d26cfebdf0f..5d9066532af 100644
--- 
a/benchmarks/src/main/java/org/apache/druid/benchmark/query/TimeseriesBenchmark.java
+++ 
b/benchmarks/src/main/java/org/apache/druid/benchmark/query/TimeseriesBenchmark.java
@@ -23,6 +23,7 @@
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.io.Files;
+import org.apache.commons.io.FileUtils;
 import org.apache.druid.benchmark.datagen.BenchmarkDataGenerator;
 import org.apache.druid.benchmark.datagen.BenchmarkSchemaInfo;
 import org.apache.druid.benchmark.datagen.BenchmarkSchemas;
@@ -72,7 +73,6 @@
 import org.apache.druid.segment.incremental.IncrementalIndex;
 import org.apache.druid.segment.serde.ComplexMetrics;
 import 
org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
-import org.apache.commons.io.FileUtils;
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.BenchmarkMode;
 import org.openjdk.jmh.annotations.Fork;
@@ -132,7 +132,6 @@
     JSON_MAPPER = new DefaultObjectMapper();
     INDEX_IO = new IndexIO(
         JSON_MAPPER,
-        OffHeapMemorySegmentWriteOutMediumFactory.instance(),
         new ColumnConfig()
         {
           @Override
diff --git 
a/benchmarks/src/main/java/org/apache/druid/benchmark/query/TopNBenchmark.java 
b/benchmarks/src/main/java/org/apache/druid/benchmark/query/TopNBenchmark.java
index 4d883e91769..a1cddc735a6 100644
--- 
a/benchmarks/src/main/java/org/apache/druid/benchmark/query/TopNBenchmark.java
+++ 
b/benchmarks/src/main/java/org/apache/druid/benchmark/query/TopNBenchmark.java
@@ -23,6 +23,7 @@
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.io.Files;
+import org.apache.commons.io.FileUtils;
 import org.apache.druid.benchmark.datagen.BenchmarkDataGenerator;
 import org.apache.druid.benchmark.datagen.BenchmarkSchemaInfo;
 import org.apache.druid.benchmark.datagen.BenchmarkSchemas;
@@ -69,7 +70,6 @@
 import org.apache.druid.segment.incremental.IncrementalIndex;
 import org.apache.druid.segment.serde.ComplexMetrics;
 import 
org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
-import org.apache.commons.io.FileUtils;
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.BenchmarkMode;
 import org.openjdk.jmh.annotations.Fork;
@@ -133,7 +133,6 @@
     JSON_MAPPER = new DefaultObjectMapper();
     INDEX_IO = new IndexIO(
         JSON_MAPPER,
-        OffHeapMemorySegmentWriteOutMediumFactory.instance(),
         new ColumnConfig()
         {
           @Override
diff --git 
a/benchmarks/src/main/java/org/apache/druid/benchmark/query/timecompare/TimeCompareBenchmark.java
 
b/benchmarks/src/main/java/org/apache/druid/benchmark/query/timecompare/TimeCompareBenchmark.java
index 13318b5d453..36b450278cc 100644
--- 
a/benchmarks/src/main/java/org/apache/druid/benchmark/query/timecompare/TimeCompareBenchmark.java
+++ 
b/benchmarks/src/main/java/org/apache/druid/benchmark/query/timecompare/TimeCompareBenchmark.java
@@ -23,6 +23,7 @@
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.Lists;
 import com.google.common.io.Files;
+import org.apache.commons.io.FileUtils;
 import org.apache.druid.benchmark.datagen.BenchmarkDataGenerator;
 import org.apache.druid.benchmark.datagen.BenchmarkSchemaInfo;
 import org.apache.druid.benchmark.datagen.BenchmarkSchemas;
@@ -75,7 +76,6 @@
 import org.apache.druid.segment.incremental.IncrementalIndex;
 import org.apache.druid.segment.serde.ComplexMetrics;
 import 
org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
-import org.apache.commons.io.FileUtils;
 import org.joda.time.Interval;
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.BenchmarkMode;
@@ -156,7 +156,6 @@
 
     INDEX_IO = new IndexIO(
         JSON_MAPPER,
-        OffHeapMemorySegmentWriteOutMediumFactory.instance(),
         new ColumnConfig()
         {
           @Override
diff --git a/docs/content/configuration/index.md 
b/docs/content/configuration/index.md
index dcde2f50a62..1d3a8e1c44c 100644
--- a/docs/content/configuration/index.md
+++ b/docs/content/configuration/index.md
@@ -665,7 +665,6 @@ These coordinator static configurations can be defined in 
the `coordinator/runti
 |`druid.coordinator.period.indexingPeriod`|How often to send 
compact/merge/conversion tasks to the indexing service. It's recommended to be 
longer than `druid.manager.segments.pollDuration`|PT1800S (30 mins)|
 |`druid.coordinator.startDelay`|The operation of the Coordinator works on the 
assumption that it has an up-to-date view of the state of the world when it 
runs, the current ZK interaction code, however, is written in a way that 
doesn’t allow the Coordinator to know for a fact that it’s done loading the 
current state of the world. This delay is a hack to give it enough time to 
believe that it has all the data.|PT300S|
 |`druid.coordinator.merge.on`|Boolean flag for whether or not the coordinator 
should try and merge small segments into a more optimal segment size.|false|
-|`druid.coordinator.conversion.on`|Boolean flag for converting old segment 
indexing versions to the latest segment indexing version.|false|
 |`druid.coordinator.load.timeout`|The timeout duration for when the 
coordinator assigns a segment to a historical node.|PT15M|
 |`druid.coordinator.kill.pendingSegments.on`|Boolean flag for whether or not 
the coordinator clean up old entries in the `pendingSegments` table of metadata 
store. If set to true, coordinator will check the created time of most recently 
complete task. If it doesn't exist, it finds the created time of the earlist 
running/pending/waiting tasks. Once the created time is found, then for all 
dataSources not in the `killPendingSegmentsSkipList` (see [Dynamic 
configuration](#dynamic-configuration)), coordinator will ask the overlord to 
clean up the entries 1 day or more older than the found created time in the 
`pendingSegments` table. This will be done periodically based on 
`druid.coordinator.period` specified.|false|
 |`druid.coordinator.kill.on`|Boolean flag for whether or not the coordinator 
should submit kill task for unused segments, that is, hard delete them from 
metadata store and deep storage. If set to true, then for all whitelisted 
dataSources (or optionally all), coordinator will submit tasks periodically 
based on `period` specified. These kill tasks will delete all segments except 
for the last `durationToRetain` period. Whitelist or All can be set via dynamic 
configuration `killAllDataSources` and `killDataSourceWhitelist` described 
later.|false|
diff --git a/docs/content/ingestion/misc-tasks.md 
b/docs/content/ingestion/misc-tasks.md
index 481e90f8752..f0755ce5e99 100644
--- a/docs/content/ingestion/misc-tasks.md
+++ b/docs/content/ingestion/misc-tasks.md
@@ -4,95 +4,6 @@ layout: doc_page
 
 # Miscellaneous Tasks
 
-## Version Converter Task
-
-The convert task suite takes active segments and will recompress them using a 
new IndexSpec. This is handy when doing activities like migrating from Concise 
to Roaring, or adding dimension compression to old segments.
-
-Upon success the new segments will have the same version as the old segment 
with `_converted` appended. A convert task may be run against the same interval 
for the same datasource multiple times. Each execution will append another 
`_converted` to the version for the segments
-
-There are two types of conversion tasks. One is the Hadoop convert task, and 
the other is the indexing service convert task. The Hadoop convert task runs on 
a hadoop cluster, and simply leaves a task monitor on the indexing service 
(similar to the hadoop batch task). The indexing service convert task runs the 
actual conversion on the indexing service.
-
-### Hadoop Convert Segment Task
-
-```json
-{
-  "type": "hadoop_convert_segment",
-  "dataSource":"some_datasource",
-  "interval":"2013/2015",
-  
"indexSpec":{"bitmap":{"type":"concise"},"dimensionCompression":"lz4","metricCompression":"lz4"},
-  "force": true,
-  "validate": false,
-  "distributedSuccessCache":"hdfs://some-hdfs-nn:9000/user/jobrunner/cache",
-  "jobPriority":"VERY_LOW",
-  "segmentOutputPath":"s3n://somebucket/somekeyprefix"
-}
-```
-
-The values are described below.
-
-|Field|Type|Description|Required|
-|-----|----|-----------|--------|
-|`type`|String|Convert task identifier|Yes: `hadoop_convert_segment`|
-|`dataSource`|String|The datasource to search for segments|Yes|
-|`interval`|Interval string|The interval in the datasource to look for 
segments|Yes|
-|`indexSpec`|json|The compression specification for the index|Yes|
-|`force`|boolean|Forces the convert task to continue even if binary versions 
indicate it has been updated recently (you probably want to do this)|No|
-|`validate`|boolean|Runs validation between the old and new segment before 
reporting task success|No|
-|`distributedSuccessCache`|URI|A location where hadoop should put intermediary 
files.|Yes|
-|`jobPriority`|`org.apache.hadoop.mapred.JobPriority` as String|The priority 
to set for the hadoop job|No|
-|`segmentOutputPath`|URI|A base uri for the segment to be placed. Same format 
as other places a segment output path is needed|Yes|
-
-### Indexing Service Convert Segment Task
-
-```json
-{
-  "type": "convert_segment",
-  "dataSource":"some_datasource",
-  "interval":"2013/2015",
-  
"indexSpec":{"bitmap":{"type":"concise"},"dimensionCompression":"lz4","metricCompression":"lz4"},
-  "force": true,
-  "validate": false
-}
-```
-
-|Field|Type|Description|Required (default)|
-|-----|----|-----------|--------|
-|`type`|String|Convert task identifier|Yes: `convert_segment`|
-|`dataSource`|String|The datasource to search for segments|Yes|
-|`interval`|Interval string|The interval in the datasource to look for 
segments|Yes|
-|`indexSpec`|json|The compression specification for the index|Yes|
-|`force`|boolean|Forces the convert task to continue even if binary versions 
indicate it has been updated recently (you probably want to do this)|No (false)|
-|`validate`|boolean|Runs validation between the old and new segment before 
reporting task success|No (true)|
-
-Unlike the hadoop convert task, the indexing service task draws its output 
path from the indexing service's configuration.
-
-#### IndexSpec
-
-The indexSpec defines segment storage format options to be used at indexing 
time, such as bitmap type and column
-compression formats. The indexSpec is optional and default parameters will be 
used if not specified.
-
-|Field|Type|Description|Required|
-|-----|----|-----------|--------|
-|bitmap|Object|Compression format for bitmap indexes. Should be a JSON object; 
see below for options.|no (defaults to Concise)|
-|dimensionCompression|String|Compression format for dimension columns. Choose 
from `LZ4`, `LZF`, or `uncompressed`.|no (default == `LZ4`)|
-|metricCompression|String|Compression format for metric columns. Choose from 
`LZ4`, `LZF`, `uncompressed`, or `none`.|no (default == `LZ4`)|
-|longEncoding|String|Encoding format for metric and dimension columns with 
type long. Choose from `auto` or `longs`. `auto` encodes the values using 
offset or lookup table depending on column cardinality, and store them with 
variable size. `longs` stores the value as is with 8 bytes each.|no (default == 
`longs`)|
-
-##### Bitmap types
-
-For Concise bitmaps:
-
-|Field|Type|Description|Required|
-|-----|----|-----------|--------|
-|type|String|Must be `concise`.|yes|
-
-For Roaring bitmaps:
-
-|Field|Type|Description|Required|
-|-----|----|-----------|--------|
-|type|String|Must be `roaring`.|yes|
-|compressRunOnSerialization|Boolean|Use a run-length encoding where it is 
estimated as more space efficient.|no (default == `true`)|
-
 ## Noop Task
 
 These tasks start, sleep for a time and are used only for testing. The 
available grammar is:
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/ConvertSegmentBackwardsCompatibleTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/ConvertSegmentBackwardsCompatibleTask.java
deleted file mode 100644
index 7a506e2c5c5..00000000000
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/ConvertSegmentBackwardsCompatibleTask.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.indexing.common.task;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
-import org.apache.druid.segment.IndexSpec;
-import org.apache.druid.timeline.DataSegment;
-import org.joda.time.Interval;
-
-import javax.annotation.Nullable;
-
-@Deprecated
-public class ConvertSegmentBackwardsCompatibleTask extends ConvertSegmentTask
-{
-  @JsonCreator
-  public ConvertSegmentBackwardsCompatibleTask(
-      @JsonProperty("id") String id,
-      @JsonProperty("dataSource") String dataSource,
-      @JsonProperty("interval") Interval interval,
-      @JsonProperty("segment") DataSegment segment,
-      @JsonProperty("indexSpec") IndexSpec indexSpec,
-      @JsonProperty("force") Boolean force,
-      @JsonProperty("validate") Boolean validate,
-      @JsonProperty("segmentWriteOutMediumFactory") @Nullable 
SegmentWriteOutMediumFactory segmentWriteOutMediumFactory
-  )
-  {
-    super(
-        id == null ? ConvertSegmentTask.makeId(dataSource, interval) : id,
-        dataSource,
-        interval,
-        segment,
-        indexSpec,
-        force == null ? false : force,
-        validate == null ? false : validate,
-        segmentWriteOutMediumFactory,
-        null
-    );
-  }
-
-  @Deprecated
-  public static class SubTask extends ConvertSegmentTask.SubTask
-  {
-    @JsonCreator
-    public SubTask(
-        @JsonProperty("groupId") String groupId,
-        @JsonProperty("segment") DataSegment segment,
-        @JsonProperty("indexSpec") IndexSpec indexSpec,
-        @JsonProperty("force") Boolean force,
-        @JsonProperty("validate") Boolean validate,
-        @JsonProperty("segmentWriteOutMediumFactory") @Nullable 
SegmentWriteOutMediumFactory segmentWriteOutMediumFactory
-    )
-    {
-      super(groupId, segment, indexSpec, force, validate, 
segmentWriteOutMediumFactory, null);
-    }
-  }
-}
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/ConvertSegmentTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/ConvertSegmentTask.java
deleted file mode 100644
index f9dfeeaf0a8..00000000000
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/ConvertSegmentTask.java
+++ /dev/null
@@ -1,452 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.indexing.common.task;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Sets;
-import org.apache.druid.indexer.TaskStatus;
-import org.apache.druid.indexing.common.TaskToolbox;
-import org.apache.druid.indexing.common.actions.SegmentInsertAction;
-import org.apache.druid.indexing.common.actions.SegmentListUsedAction;
-import org.apache.druid.indexing.common.actions.TaskActionClient;
-import org.apache.druid.java.util.common.DateTimes;
-import org.apache.druid.java.util.common.StringUtils;
-import org.apache.druid.java.util.common.guava.FunctionalIterable;
-import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
-import org.apache.druid.segment.IndexIO;
-import org.apache.druid.segment.IndexSpec;
-import org.apache.druid.segment.loading.SegmentLoadingException;
-import org.apache.druid.timeline.DataSegment;
-import org.joda.time.Interval;
-
-import javax.annotation.Nullable;
-import java.io.File;
-import java.io.IOException;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-/**
- * This task takes a segment and attempts to reindex it in the latest version 
with the specified indexSpec.
- * <p/>
- * Only datasource must be specified. `indexSpec` and `force` are highly 
suggested but optional. The rest get
- * auto-configured and should only be modified with great care
- */
-public class ConvertSegmentTask extends AbstractFixedIntervalTask
-{
-  private static final String TYPE = "convert_segment";
-  private static final Integer CURR_VERSION_INTEGER = 
IndexIO.CURRENT_VERSION_ID;
-
-  private static final Logger log = new Logger(ConvertSegmentTask.class);
-
-  /**
-   * Create a segment converter task to convert a segment to the most recent 
version including the specified indexSpec
-   *
-   * @param dataSource The datasource to which this update should be applied
-   * @param interval   The interval in the datasource which to apply the 
update to
-   * @param indexSpec  The IndexSpec to use in the updated segments
-   * @param force      Force an update, even if the task thinks it doesn't 
need to update.
-   * @param validate   Validate the new segment compared to the old segment on 
a row by row basis
-   *
-   * @return A SegmentConverterTask for the datasource's interval with the 
indexSpec specified.
-   */
-  public static ConvertSegmentTask create(
-      String dataSource,
-      Interval interval,
-      IndexSpec indexSpec,
-      boolean force,
-      boolean validate,
-      @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
-      Map<String, Object> context
-  )
-  {
-    final String id = makeId(dataSource, interval);
-    return new ConvertSegmentTask(
-        id,
-        dataSource,
-        interval,
-        null,
-        indexSpec,
-        force,
-        validate,
-        segmentWriteOutMediumFactory,
-        context
-    );
-  }
-
-  /**
-   * Create a task to update the segment specified to the most recent binary 
version with the specified indexSpec
-   *
-   * @param segment   The segment to which this update should be applied
-   * @param indexSpec The IndexSpec to use in the updated segments
-   * @param force     Force an update, even if the task thinks it doesn't need 
to update.
-   * @param validate  Validate the new segment compared to the old segment on 
a row by row basis
-   *
-   * @return A SegmentConverterTask for the segment with the indexSpec 
specified.
-   */
-  public static ConvertSegmentTask create(
-      DataSegment segment,
-      IndexSpec indexSpec,
-      boolean force,
-      boolean validate,
-      @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
-      Map<String, Object> context
-  )
-  {
-    final Interval interval = segment.getInterval();
-    final String dataSource = segment.getDataSource();
-    final String id = makeId(dataSource, interval);
-    return new ConvertSegmentTask(
-        id,
-        dataSource,
-        interval,
-        segment,
-        indexSpec,
-        force,
-        validate,
-        segmentWriteOutMediumFactory,
-        context
-    );
-  }
-
-  protected static String makeId(String dataSource, Interval interval)
-  {
-    Preconditions.checkNotNull(dataSource, "dataSource");
-    Preconditions.checkNotNull(interval, "interval");
-    return joinId(TYPE, dataSource, interval.getStart(), interval.getEnd(), 
DateTimes.nowUtc());
-  }
-
-  @JsonCreator
-  private static ConvertSegmentTask createFromJson(
-      @JsonProperty("id") String id,
-      @JsonProperty("dataSource") String dataSource,
-      @JsonProperty("interval") Interval interval,
-      @JsonProperty("segment") DataSegment segment,
-      @JsonProperty("indexSpec") IndexSpec indexSpec,
-      @JsonProperty("force") Boolean force,
-      @JsonProperty("validate") Boolean validate,
-      @JsonProperty("context") Map<String, Object> context,
-      @JsonProperty("segmentWriteOutMediumFactory") @Nullable 
SegmentWriteOutMediumFactory segmentWriteOutMediumFactory
-  )
-  {
-    final boolean isForce = force == null ? false : force;
-    final boolean isValidate = validate == null ? true : validate;
-    if (id == null) {
-      if (segment == null) {
-        return create(dataSource, interval, indexSpec, isForce, isValidate, 
segmentWriteOutMediumFactory, context);
-      } else {
-        return create(segment, indexSpec, isForce, isValidate, 
segmentWriteOutMediumFactory, context);
-      }
-    }
-    return new ConvertSegmentTask(
-        id,
-        dataSource,
-        interval,
-        segment,
-        indexSpec,
-        isForce,
-        isValidate,
-        segmentWriteOutMediumFactory,
-        context
-    );
-  }
-
-  @JsonIgnore
-  private final DataSegment segment;
-  private final IndexSpec indexSpec;
-  private final boolean force;
-  private final boolean validate;
-  @Nullable
-  private final SegmentWriteOutMediumFactory segmentWriteOutMediumFactory;
-
-  ConvertSegmentTask(
-      String id,
-      String dataSource,
-      Interval interval,
-      DataSegment segment,
-      IndexSpec indexSpec,
-      boolean force,
-      boolean validate,
-      @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
-      Map<String, Object> context
-  )
-  {
-    super(id, dataSource, interval, context);
-    this.segment = segment;
-    this.indexSpec = indexSpec == null ? new IndexSpec() : indexSpec;
-    this.force = force;
-    this.validate = validate;
-    this.segmentWriteOutMediumFactory = segmentWriteOutMediumFactory;
-  }
-
-  @JsonProperty
-  public boolean isForce()
-  {
-    return force;
-  }
-
-  @JsonProperty
-  public boolean isValidate()
-  {
-    return validate;
-  }
-
-  @JsonProperty
-  public IndexSpec getIndexSpec()
-  {
-    return indexSpec;
-  }
-
-  @Override
-  public String getType()
-  {
-    return TYPE;
-  }
-
-  @JsonProperty
-  public DataSegment getSegment()
-  {
-    return segment;
-  }
-
-  @JsonProperty
-  @Nullable
-  public SegmentWriteOutMediumFactory getSegmentWriteOutMediumFactory()
-  {
-    return segmentWriteOutMediumFactory;
-  }
-
-  @Override
-  public TaskStatus run(TaskToolbox toolbox) throws Exception
-  {
-    final Iterable<DataSegment> segmentsToUpdate;
-    if (segment == null) {
-      final List<DataSegment> segments = toolbox.getTaskActionClient().submit(
-          new SegmentListUsedAction(
-              getDataSource(),
-              getInterval(),
-              null
-          )
-      );
-      segmentsToUpdate = FunctionalIterable
-          .create(segments)
-          .filter(
-              new Predicate<DataSegment>()
-              {
-                @Override
-                public boolean apply(DataSegment segment)
-                {
-                  final Integer segmentVersion = segment.getBinaryVersion();
-                  if (!CURR_VERSION_INTEGER.equals(segmentVersion)) {
-                    return true;
-                  } else if (force) {
-                    log.info(
-                        "Segment[%s] already at version[%s], forcing 
conversion",
-                        segment.getIdentifier(),
-                        segmentVersion
-                    );
-                    return true;
-                  } else {
-                    log.info("Skipping[%s], already version[%s]", 
segment.getIdentifier(), segmentVersion);
-                    return false;
-                  }
-                }
-              }
-          );
-    } else {
-      log.info("I'm in a subless mood.");
-      segmentsToUpdate = Collections.singleton(segment);
-    }
-    // Vestigial from a past time when this task spawned subtasks.
-    for (final Task subTask : generateSubTasks(getGroupId(), segmentsToUpdate, 
indexSpec, force, validate, getContext())) {
-      final TaskStatus status = subTask.run(toolbox);
-      if (!status.isSuccess()) {
-        return TaskStatus.fromCode(getId(), status.getStatusCode());
-      }
-    }
-    return success();
-  }
-
-  protected Iterable<Task> generateSubTasks(
-      final String groupId,
-      final Iterable<DataSegment> segments,
-      final IndexSpec indexSpec,
-      final boolean force,
-      final boolean validate,
-      final Map<String, Object> context
-  )
-  {
-    return Iterables.transform(
-        segments,
-        new Function<DataSegment, Task>()
-        {
-          @Override
-          public Task apply(DataSegment input)
-          {
-            return new SubTask(groupId, input, indexSpec, force, validate, 
segmentWriteOutMediumFactory, context);
-          }
-        }
-    );
-  }
-
-  @Override
-  public boolean equals(Object o)
-  {
-    if (this == o) {
-      return true;
-    }
-    if (o == null || getClass() != o.getClass()) {
-      return false;
-    }
-
-    ConvertSegmentTask that = (ConvertSegmentTask) o;
-
-    if (segment != null ? !segment.equals(that.segment) : that.segment != 
null) {
-      return false;
-    }
-
-    return super.equals(o);
-  }
-
-  public static class SubTask extends AbstractFixedIntervalTask
-  {
-    @JsonIgnore
-    private final DataSegment segment;
-    private final IndexSpec indexSpec;
-    private final boolean force;
-    private final boolean validate;
-    @Nullable
-    private final SegmentWriteOutMediumFactory segmentWriteOutMediumFactory;
-
-    @JsonCreator
-    public SubTask(
-        @JsonProperty("groupId") String groupId,
-        @JsonProperty("segment") DataSegment segment,
-        @JsonProperty("indexSpec") IndexSpec indexSpec,
-        @JsonProperty("force") Boolean force,
-        @JsonProperty("validate") Boolean validate,
-        @JsonProperty("segmentWriteOutMediumFactory") @Nullable 
SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
-        @JsonProperty("context") Map<String, Object> context
-    )
-    {
-      super(
-          joinId(
-              groupId,
-              "sub",
-              segment.getInterval().getStart(),
-              segment.getInterval().getEnd(),
-              segment.getShardSpec().getPartitionNum()
-          ),
-          groupId,
-          segment.getDataSource(),
-          segment.getInterval(),
-          context
-      );
-      this.segment = segment;
-      this.indexSpec = indexSpec == null ? new IndexSpec() : indexSpec;
-      this.force = force == null ? false : force;
-      this.validate = validate == null ? true : validate;
-      this.segmentWriteOutMediumFactory = segmentWriteOutMediumFactory;
-    }
-
-    @JsonProperty
-    public boolean isValidate()
-    {
-      return validate;
-    }
-
-    @JsonProperty
-    public boolean isForce()
-    {
-      return force;
-    }
-
-    @JsonProperty
-    public DataSegment getSegment()
-    {
-      return segment;
-    }
-
-    @Override
-    public String getType()
-    {
-      return "version_converter_sub";
-    }
-
-    @Override
-    public TaskStatus run(TaskToolbox toolbox) throws Exception
-    {
-      log.info("Subs are good!  Italian BMT and Meatball are probably my 
favorite.");
-      try {
-        convertSegment(toolbox);
-      }
-      catch (Exception e) {
-        log.error(e, "Conversion failed.");
-        throw e;
-      }
-      return success();
-    }
-
-    private void convertSegment(TaskToolbox toolbox) throws 
SegmentLoadingException, IOException
-    {
-      log.info("Converting segment[%s]", segment);
-      final TaskActionClient actionClient = toolbox.getTaskActionClient();
-      final List<DataSegment> currentSegments = actionClient.submit(
-          new SegmentListUsedAction(segment.getDataSource(), 
segment.getInterval(), null)
-      );
-
-      for (DataSegment currentSegment : currentSegments) {
-        final String version = currentSegment.getVersion();
-        final Integer binaryVersion = currentSegment.getBinaryVersion();
-
-        if (!force && (version.startsWith(segment.getVersion()) && 
CURR_VERSION_INTEGER.equals(binaryVersion))) {
-          log.info("Skipping already updated segment[%s].", segment);
-          return;
-        }
-      }
-
-      final Map<DataSegment, File> localSegments = 
toolbox.fetchSegments(Collections.singletonList(segment));
-
-      final File location = localSegments.get(segment);
-      final File outLocation = new File(location, "v9_out");
-      IndexIO indexIO = toolbox.getIndexIO();
-      if (indexIO.convertSegment(location, outLocation, indexSpec, force, 
validate, segmentWriteOutMediumFactory)) {
-        final int outVersion = IndexIO.getVersionFromDir(outLocation);
-
-        // Appending to the version makes a new version that inherits most 
comparability parameters of the original
-        // version, but is "newer" than said original version.
-        DataSegment updatedSegment = 
segment.withVersion(StringUtils.format("%s_v%s", segment.getVersion(), 
outVersion));
-
-        updatedSegment = toolbox.getSegmentPusher().push(outLocation, 
updatedSegment, false);
-
-        actionClient.submit(new 
SegmentInsertAction(Sets.newHashSet(updatedSegment)));
-      } else {
-        log.info("Conversion failed.");
-      }
-    }
-  }
-}
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopConverterTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopConverterTask.java
deleted file mode 100644
index 7fd5dd1dbdc..00000000000
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/HadoopConverterTask.java
+++ /dev/null
@@ -1,289 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.indexing.common.task;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonIgnore;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableList;
-import org.apache.druid.indexer.updater.HadoopConverterJob;
-import org.apache.druid.indexer.updater.HadoopDruidConverterConfig;
-import org.apache.druid.indexer.TaskStatus;
-import org.apache.druid.indexing.common.TaskToolbox;
-import org.apache.druid.indexing.common.actions.TaskActionClient;
-import org.apache.druid.java.util.common.UOE;
-import org.apache.druid.java.util.common.logger.Logger;
-import org.apache.druid.segment.IndexSpec;
-import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
-import org.apache.druid.timeline.DataSegment;
-import org.joda.time.Interval;
-
-import javax.annotation.Nullable;
-import java.io.IOException;
-import java.net.URI;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-public class HadoopConverterTask extends ConvertSegmentTask
-{
-  private static final String TYPE = "hadoop_convert_segment";
-  private static final Logger log = new Logger(HadoopConverterTask.class);
-
-  private final List<String> hadoopDependencyCoordinates;
-  private final URI distributedSuccessCache;
-  private final String jobPriority;
-  private final String segmentOutputPath;
-  private final String classpathPrefix;
-
-  @JsonCreator
-  public HadoopConverterTask(
-      @JsonProperty("id") String id,
-      @JsonProperty("dataSource") String dataSource,
-      @JsonProperty("interval") Interval interval,
-      @JsonProperty("indexSpec") IndexSpec indexSpec,
-      @JsonProperty("force") boolean force,
-      @JsonProperty("validate") Boolean validate,
-      @JsonProperty("hadoopDependencyCoordinates") List<String> 
hadoopDependencyCoordinates,
-      @JsonProperty("distributedSuccessCache") URI distributedSuccessCache,
-      @JsonProperty("jobPriority") String jobPriority,
-      @JsonProperty("segmentOutputPath") String segmentOutputPath,
-      @JsonProperty("classpathPrefix") String classpathPrefix,
-      @JsonProperty("segmentWriteOutMediumFactory") @Nullable 
SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
-      @JsonProperty("context") Map<String, Object> context
-  )
-  {
-    super(
-        getOrMakeId(
-            id,
-            TYPE,
-            Preconditions.checkNotNull(dataSource, "dataSource"),
-            Preconditions.checkNotNull(interval, "interval")
-        ),
-        dataSource,
-        interval,
-        null, // Always call subtask codepath
-        indexSpec,
-        force,
-        validate == null ? true : validate,
-        segmentWriteOutMediumFactory,
-        context
-    );
-    this.hadoopDependencyCoordinates = hadoopDependencyCoordinates;
-    this.distributedSuccessCache = 
Preconditions.checkNotNull(distributedSuccessCache, "distributedSuccessCache");
-    this.segmentOutputPath = Preconditions.checkNotNull(segmentOutputPath, 
"segmentOutputPath");
-    this.jobPriority = jobPriority;
-    this.classpathPrefix = classpathPrefix;
-  }
-
-  @JsonProperty
-  public List<String> getHadoopDependencyCoordinates()
-  {
-    return hadoopDependencyCoordinates;
-  }
-
-  @JsonProperty
-  public URI getDistributedSuccessCache()
-  {
-    return distributedSuccessCache;
-  }
-
-  @JsonProperty
-  public String getJobPriority()
-  {
-    return jobPriority;
-  }
-
-  @JsonProperty
-  public String getSegmentOutputPath()
-  {
-    return segmentOutputPath;
-  }
-
-  @Override
-  @JsonProperty
-  public String getClasspathPrefix()
-  {
-    return classpathPrefix;
-  }
-
-  @Override
-  protected Iterable<Task> generateSubTasks(
-      final String groupId,
-      final Iterable<DataSegment> segments,
-      final IndexSpec indexSpec,
-      final boolean force,
-      final boolean validate,
-      Map<String, Object> context
-  )
-  {
-    return Collections.singleton(
-        new ConverterSubTask(
-            ImmutableList.copyOf(segments),
-            this,
-            context
-        )
-    );
-  }
-
-  @Override
-  @JsonIgnore
-  public DataSegment getSegment()
-  {
-    throw new UOE(
-        "Sub-less data segment not supported for hadoop converter task. 
Specify interval and datasource instead"
-    );
-  }
-
-  @Override
-  public String getType()
-  {
-    return TYPE;
-  }
-
-  public static class ConverterSubTask extends HadoopTask
-  {
-    private final List<DataSegment> segments;
-    private final HadoopConverterTask parent;
-
-    @JsonCreator
-    public ConverterSubTask(
-        @JsonProperty("segments") List<DataSegment> segments,
-        @JsonProperty("parent") HadoopConverterTask parent,
-        @JsonProperty("context") Map<String, Object> context
-    )
-    {
-      super(
-          joinId(
-              Preconditions.checkNotNull(parent, "parent").getGroupId(),
-              "sub",
-              parent.getInterval().getStart(),
-              parent.getInterval().getEnd()
-          ),
-          parent.getDataSource(),
-          parent.getHadoopDependencyCoordinates(),
-          context
-      );
-      this.segments = segments;
-      this.parent = parent;
-    }
-
-    @JsonProperty
-    public List<DataSegment> getSegments()
-    {
-      return segments;
-    }
-
-    @JsonProperty
-    public HadoopConverterTask getParent()
-    {
-      return parent;
-    }
-
-    @Override
-    public String getType()
-    {
-      return TYPE + "_sub";
-    }
-
-    @Override
-    public boolean isReady(TaskActionClient taskActionClient)
-    {
-      return true;
-    }
-
-    @Override
-    public TaskStatus run(TaskToolbox toolbox) throws Exception
-    {
-      final Map<String, String> hadoopProperties = new HashMap<>();
-      final Properties properties = injector.getInstance(Properties.class);
-      for (String name : properties.stringPropertyNames()) {
-        if (name.startsWith("hadoop.")) {
-          hadoopProperties.put(name.substring("hadoop.".length()), 
properties.getProperty(name));
-        }
-      }
-      final ClassLoader loader = buildClassLoader(toolbox);
-      final HadoopDruidConverterConfig config = new HadoopDruidConverterConfig(
-          getDataSource(),
-          parent.getInterval(),
-          parent.getIndexSpec(),
-          segments,
-          parent.isValidate(),
-          parent.getDistributedSuccessCache(),
-          hadoopProperties,
-          parent.getJobPriority(),
-          parent.getSegmentOutputPath()
-      );
-
-      final String finishedSegmentString = invokeForeignLoader(
-          
"org.apache.druid.indexing.common.task.HadoopConverterTask$JobInvoker",
-          new 
String[]{HadoopDruidConverterConfig.jsonMapper.writeValueAsString(config)},
-          loader
-      );
-      if (finishedSegmentString == null) {
-        return TaskStatus.failure(getId());
-      }
-      final List<DataSegment> finishedSegments = 
HadoopDruidConverterConfig.jsonMapper.readValue(
-          finishedSegmentString,
-          new TypeReference<List<DataSegment>>()
-          {
-          }
-      );
-      log.debug("Found new segments %s", 
Arrays.toString(finishedSegments.toArray()));
-      toolbox.publishSegments(finishedSegments);
-      return success();
-    }
-  }
-
-  /** Called indirectly in {@link ConverterSubTask#run(TaskToolbox)}. */
-  @SuppressWarnings("unused")
-  public static class JobInvoker
-  {
-    public static String runTask(String[] input)
-    {
-      final HadoopDruidConverterConfig config;
-      try {
-        config = HadoopDruidConverterConfig.jsonMapper.readValue(
-            input[0],
-            HadoopDruidConverterConfig.class
-        );
-      }
-      catch (IOException e) {
-        throw Throwables.propagate(e);
-      }
-      final HadoopConverterJob hadoopConverterJob = new 
HadoopConverterJob(config);
-      try {
-        final List<DataSegment> result = hadoopConverterJob.run();
-        return result == null
-               ? null
-               : 
HadoopDruidConverterConfig.jsonMapper.writeValueAsString(result);
-      }
-      catch (IOException e) {
-        throw Throwables.propagate(e);
-      }
-    }
-  }
-}
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java
index 793f64212f1..3ed45482cc4 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java
@@ -55,15 +55,9 @@
     @JsonSubTypes.Type(name = ParallelIndexSupervisorTask.TYPE, value = 
ParallelIndexSupervisorTask.class),
     @JsonSubTypes.Type(name = ParallelIndexSubTask.TYPE, value = 
ParallelIndexSubTask.class),
     @JsonSubTypes.Type(name = "index_hadoop", value = HadoopIndexTask.class),
-    @JsonSubTypes.Type(name = "hadoop_convert_segment", value = 
HadoopConverterTask.class),
-    @JsonSubTypes.Type(name = "hadoop_convert_segment_sub", value = 
HadoopConverterTask.ConverterSubTask.class),
     @JsonSubTypes.Type(name = "index_realtime", value = 
RealtimeIndexTask.class),
     @JsonSubTypes.Type(name = "index_realtime_appenderator", value = 
AppenderatorDriverRealtimeIndexTask.class),
     @JsonSubTypes.Type(name = "noop", value = NoopTask.class),
-    @JsonSubTypes.Type(name = "version_converter", value = 
ConvertSegmentBackwardsCompatibleTask.class), // Backwards compat - Deprecated
-    @JsonSubTypes.Type(name = "version_converter_sub", value = 
ConvertSegmentBackwardsCompatibleTask.SubTask.class), // backwards compat - 
Deprecated
-    @JsonSubTypes.Type(name = "convert_segment", value = 
ConvertSegmentTask.class),
-    @JsonSubTypes.Type(name = "convert_segment_sub", value = 
ConvertSegmentTask.SubTask.class),
     @JsonSubTypes.Type(name = "same_interval_merge", value = 
SameIntervalMergeTask.class),
     @JsonSubTypes.Type(name = "compact", value = CompactionTask.class)
 })
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/TestUtils.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/TestUtils.java
index fbe1d0274fc..a71545ebd7d 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/TestUtils.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/TestUtils.java
@@ -63,7 +63,6 @@ public TestUtils()
     this.jsonMapper = new DefaultObjectMapper();
     indexIO = new IndexIO(
         jsonMapper,
-        OffHeapMemorySegmentWriteOutMediumFactory.instance(),
         new ColumnConfig()
         {
           @Override
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
index 381b51c549a..0b49ef79a51 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/CompactionTaskTest.java
@@ -762,7 +762,7 @@ private static void assertIngestionSchema(
         Map<DataSegment, File> segmentFileMap
     )
     {
-      super(mapper, OffHeapMemorySegmentWriteOutMediumFactory.instance(), () 
-> 0);
+      super(mapper, () -> 0);
 
       queryableIndexMap = new HashMap<>(segmentFileMap.size());
       for (Entry<DataSegment, File> entry : segmentFileMap.entrySet()) {
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ConvertSegmentTaskTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ConvertSegmentTaskTest.java
deleted file mode 100644
index de99f22fefb..00000000000
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/ConvertSegmentTaskTest.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.indexing.common.task;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import org.apache.druid.indexing.common.TestUtils;
-import org.apache.druid.java.util.common.DateTimes;
-import org.apache.druid.java.util.common.Intervals;
-import org.apache.druid.timeline.DataSegment;
-import org.apache.druid.timeline.partition.NoneShardSpec;
-import org.joda.time.DateTime;
-import org.joda.time.Interval;
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- */
-public class ConvertSegmentTaskTest
-{
-  private final ObjectMapper jsonMapper;
-
-  public ConvertSegmentTaskTest()
-  {
-    TestUtils testUtils = new TestUtils();
-    jsonMapper = testUtils.getTestObjectMapper();
-  }
-
-  @Test
-  public void testSerializationSimple() throws Exception
-  {
-    final String dataSource = "billy";
-    DateTime start = DateTimes.nowUtc();
-    final Interval interval = new Interval(start.minus(1000), start);
-
-    ConvertSegmentTask task = ConvertSegmentTask.create(dataSource, interval, 
null, false, true, null, null);
-
-    Task task2 = 
jsonMapper.readValue(jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(task),
 Task.class);
-    Assert.assertEquals(task, task2);
-
-    DataSegment segment = new DataSegment(
-        dataSource,
-        interval,
-        DateTimes.nowUtc().toString(),
-        ImmutableMap.of(),
-        ImmutableList.of(),
-        ImmutableList.of(),
-        NoneShardSpec.instance(),
-        9,
-        102937
-    );
-
-    task = ConvertSegmentTask.create(segment, null, false, true, null, null);
-
-    task2 = 
jsonMapper.readValue(jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(task),
 Task.class);
-    Assert.assertEquals(task, task2);
-  }
-
-  @Test
-  public void testdeSerializationFromJsonString() throws Exception
-  {
-    String json = "{\n"
-                  + "  \"type\" : \"convert_segment\",\n"
-                  + "  \"dataSource\" : \"billy\",\n"
-                  + "  \"interval\" : 
\"2015-08-27T00:00:00.000Z/2015-08-28T00:00:00.000Z\"\n"
-                  + "}";
-    ConvertSegmentTask task = (ConvertSegmentTask) jsonMapper.readValue(json, 
Task.class);
-    Assert.assertEquals("billy", task.getDataSource());
-    
Assert.assertEquals(Intervals.of("2015-08-27T00:00:00.000Z/2015-08-28T00:00:00.000Z"),
 task.getInterval());
-  }
-
-  @Test
-  public void testSerdeBackwardsCompatible() throws Exception
-  {
-    String json = "{\n"
-                  + "  \"type\" : \"version_converter\",\n"
-                  + "  \"dataSource\" : \"billy\",\n"
-                  + "  \"interval\" : 
\"2015-08-27T00:00:00.000Z/2015-08-28T00:00:00.000Z\"\n"
-                  + "}";
-    ConvertSegmentTask task = (ConvertSegmentTask) jsonMapper.readValue(json, 
Task.class);
-    Assert.assertEquals("billy", task.getDataSource());
-    
Assert.assertEquals(Intervals.of("2015-08-27T00:00:00.000Z/2015-08-28T00:00:00.000Z"),
 task.getInterval());
-  }
-}
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/HadoopConverterTaskSerDeTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/HadoopConverterTaskSerDeTest.java
deleted file mode 100644
index ebeb3b7082a..00000000000
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/HadoopConverterTaskSerDeTest.java
+++ /dev/null
@@ -1,345 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.indexing.common.task;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import org.apache.druid.indexing.common.TestUtils;
-import org.apache.druid.java.util.common.Intervals;
-import org.apache.druid.segment.IndexSpec;
-import org.apache.druid.segment.data.CompressionFactory;
-import org.apache.druid.segment.data.CompressionStrategy;
-import org.apache.druid.segment.data.ConciseBitmapSerdeFactory;
-import org.apache.druid.timeline.DataSegment;
-import org.apache.druid.timeline.partition.NoneShardSpec;
-import org.apache.druid.timeline.partition.ShardSpec;
-import org.joda.time.Interval;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.net.URI;
-import java.util.List;
-import java.util.Map;
-
-public class HadoopConverterTaskSerDeTest
-{
-
-  private static final String TASK_ID = "task id";
-  private static final String DATA_SOURCE = "datasource";
-  private static final Interval INTERVAL = Intervals.of("2010/2011");
-  private static final String SEGMENT_VERSION = "some version";
-  private static final Map<String, Object> LOAD_SPEC = 
ImmutableMap.of("someKey", "someVal");
-  private static final List<String> DIMENSIONS = ImmutableList.of("dim1", 
"dim2");
-  private static final List<String> METRICS = ImmutableList.of("metric1", 
"metric2");
-  private static final ShardSpec SHARD_SPEC = NoneShardSpec.instance();
-  private static final int BINARY_VERSION = 34718;
-  private static final long SEGMENT_SIZE = 7483901348790L;
-  private static final IndexSpec INDEX_SPEC = new IndexSpec(new 
ConciseBitmapSerdeFactory(),
-                                                            
CompressionStrategy.LZ4,
-                                                            
CompressionStrategy.LZF,
-                                                            
CompressionFactory.LongEncodingStrategy.LONGS);
-  private static final DataSegment DATA_SEGMENT = new DataSegment(
-      DATA_SOURCE,
-      INTERVAL,
-      SEGMENT_VERSION,
-      LOAD_SPEC,
-      DIMENSIONS,
-      METRICS,
-      SHARD_SPEC,
-      BINARY_VERSION,
-      SEGMENT_SIZE
-  );
-  private static final List<String> HADOOP_DEPENDENCY = 
ImmutableList.of("dependency1");
-  private static final URI DISTRIBUTED_CACHE = 
URI.create("http://your.momma";); // Should have plenty of space
-  private static final String PRIORITY = "0";
-  private static final String OUTPUT_PATH = "/dev/null";
-  private static final String CLASSPATH_PREFIX = 
"something:where:I:need:stuff";
-
-  private final ObjectMapper jsonMapper;
-
-  public HadoopConverterTaskSerDeTest()
-  {
-    TestUtils testUtils = new TestUtils();
-    jsonMapper = testUtils.getTestObjectMapper();
-  }
-
-  @Test
-  public void testSimpleConverterTaskSerDe() throws IOException
-  {
-    HadoopConverterTask orig = new HadoopConverterTask(
-        TASK_ID,
-        DATA_SOURCE,
-        INTERVAL,
-        INDEX_SPEC,
-        true,
-        true,
-        HADOOP_DEPENDENCY,
-        DISTRIBUTED_CACHE,
-        PRIORITY,
-        OUTPUT_PATH,
-        CLASSPATH_PREFIX,
-        null,
-        null
-    );
-    final String strOrig = jsonMapper.writeValueAsString(orig);
-    HadoopConverterTask other = jsonMapper.readValue(strOrig, 
HadoopConverterTask.class);
-    Assert.assertEquals(strOrig, jsonMapper.writeValueAsString(other));
-    Assert.assertFalse(orig == other);
-    Assert.assertEquals(orig, other);
-    assertExpectedTask(other);
-  }
-
-  @Test
-  public void testSimpleSubTaskSerDe() throws IOException
-  {
-    HadoopConverterTask parent = new HadoopConverterTask(
-        TASK_ID,
-        DATA_SOURCE,
-        INTERVAL,
-        INDEX_SPEC,
-        true,
-        true,
-        HADOOP_DEPENDENCY,
-        DISTRIBUTED_CACHE,
-        PRIORITY,
-        OUTPUT_PATH,
-        CLASSPATH_PREFIX,
-        null,
-        null
-    );
-    HadoopConverterTask.ConverterSubTask subTask = new 
HadoopConverterTask.ConverterSubTask(
-        ImmutableList.of(
-            DATA_SEGMENT
-        ),
-        parent,
-        null
-    );
-    final String origString = jsonMapper.writeValueAsString(subTask);
-    final HadoopConverterTask.ConverterSubTask otherSub = jsonMapper.readValue(
-        origString,
-        HadoopConverterTask.ConverterSubTask.class
-    );
-    Assert.assertEquals(subTask, otherSub);
-    Assert.assertEquals(origString, jsonMapper.writeValueAsString(otherSub));
-    Assert.assertEquals(ImmutableList.of(DATA_SEGMENT), 
otherSub.getSegments());
-    Assert.assertFalse(parent == otherSub.getParent());
-    Assert.assertEquals(parent, otherSub.getParent());
-
-    assertExpectedTask(otherSub.getParent());
-  }
-
-  private static void assertExpectedTask(HadoopConverterTask other)
-  {
-    Assert.assertEquals(TASK_ID, other.getId());
-    Assert.assertEquals(DATA_SOURCE, other.getDataSource());
-    Assert.assertEquals(INTERVAL, other.getInterval());
-    Assert.assertEquals(INDEX_SPEC, other.getIndexSpec());
-    Assert.assertTrue(other.isForce());
-    Assert.assertTrue(other.isValidate());
-    Assert.assertEquals(HADOOP_DEPENDENCY, 
other.getHadoopDependencyCoordinates());
-    Assert.assertEquals(DISTRIBUTED_CACHE, other.getDistributedSuccessCache());
-    Assert.assertEquals(PRIORITY, other.getJobPriority());
-    Assert.assertEquals(OUTPUT_PATH, other.getSegmentOutputPath());
-    Assert.assertEquals(CLASSPATH_PREFIX, other.getClasspathPrefix());
-  }
-
-  @Test
-  public void testSubTask()
-  {
-    HadoopConverterTask parent = new HadoopConverterTask(
-        TASK_ID,
-        DATA_SOURCE,
-        INTERVAL,
-        INDEX_SPEC,
-        true,
-        true,
-        HADOOP_DEPENDENCY,
-        DISTRIBUTED_CACHE,
-        PRIORITY,
-        OUTPUT_PATH,
-        CLASSPATH_PREFIX,
-        null,
-        null
-    );
-    HadoopConverterTask.ConverterSubTask subTask = new 
HadoopConverterTask.ConverterSubTask(
-        ImmutableList.of(
-            DATA_SEGMENT
-        ),
-        parent,
-        null
-    );
-    Assert.assertEquals(parent.getType(), "hadoop_convert_segment");
-    Assert.assertEquals(parent.getType() + "_sub", subTask.getType());
-  }
-
-  @Test
-  public void testNullValidate()
-  {
-    HadoopConverterTask orig = new HadoopConverterTask(
-        TASK_ID,
-        DATA_SOURCE,
-        INTERVAL,
-        INDEX_SPEC,
-        true,
-        null,
-        HADOOP_DEPENDENCY,
-        DISTRIBUTED_CACHE,
-        PRIORITY,
-        OUTPUT_PATH,
-        CLASSPATH_PREFIX,
-        null,
-        null
-    );
-    Assert.assertTrue(orig.isValidate());
-  }
-
-  @Test
-  public void testMinimal()
-  {
-    HadoopConverterTask parent = new HadoopConverterTask(
-        null,
-        DATA_SOURCE,
-        INTERVAL,
-        null,
-        true,
-        null,
-        null,
-        DISTRIBUTED_CACHE,
-        null,
-        OUTPUT_PATH,
-        null,
-        null,
-        null
-    );
-    Assert.assertEquals(DATA_SOURCE, parent.getDataSource());
-    Assert.assertEquals(INTERVAL, parent.getInterval());
-    Assert.assertEquals(DISTRIBUTED_CACHE, 
parent.getDistributedSuccessCache());
-    Assert.assertEquals(OUTPUT_PATH, parent.getSegmentOutputPath());
-    Assert.assertNotNull(parent.getId());
-    Assert.assertFalse(parent.getId().isEmpty());
-  }
-
-  @Test(expected = UnsupportedOperationException.class)
-  public void testGetDataSegment()
-  {
-    HadoopConverterTask orig = new HadoopConverterTask(
-        TASK_ID,
-        DATA_SOURCE,
-        INTERVAL,
-        INDEX_SPEC,
-        true,
-        null,
-        HADOOP_DEPENDENCY,
-        DISTRIBUTED_CACHE,
-        PRIORITY,
-        OUTPUT_PATH,
-        CLASSPATH_PREFIX,
-        null,
-        null
-    );
-    orig.getSegment();
-  }
-
-  @Test(expected = NullPointerException.class)
-  public void testNull1()
-  {
-    @SuppressWarnings("unused")
-    HadoopConverterTask parent = new HadoopConverterTask(
-        null,
-        null,
-        INTERVAL,
-        null,
-        true,
-        null,
-        null,
-        DISTRIBUTED_CACHE,
-        null,
-        OUTPUT_PATH,
-        null,
-        null,
-        null
-    );
-  }
-
-  @Test(expected = NullPointerException.class)
-  public void testNull2()
-  {
-    @SuppressWarnings("unused")
-    HadoopConverterTask parent = new HadoopConverterTask(
-        null,
-        DATA_SOURCE,
-        null,
-        null,
-        true,
-        null,
-        null,
-        DISTRIBUTED_CACHE,
-        null,
-        OUTPUT_PATH,
-        null,
-        null,
-        null
-    );
-  }
-
-  @Test(expected = NullPointerException.class)
-  public void testNull3()
-  {
-    @SuppressWarnings("unused")
-    HadoopConverterTask parent = new HadoopConverterTask(
-        null,
-        DATA_SOURCE,
-        INTERVAL,
-        null,
-        true,
-        null,
-        null,
-        null,
-        null,
-        OUTPUT_PATH,
-        null,
-        null,
-        null
-    );
-  }
-
-  @Test(expected = NullPointerException.class)
-  public void testNull4()
-  {
-    @SuppressWarnings("unused")
-    HadoopConverterTask parent = new HadoopConverterTask(
-        null,
-        DATA_SOURCE,
-        INTERVAL,
-        null,
-        true,
-        null,
-        null,
-        DISTRIBUTED_CACHE,
-        null,
-        null,
-        null,
-        null,
-        null
-    );
-  }
-}
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java
index a2c742e84f1..187916c105c 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskSerdeTest.java
@@ -40,9 +40,6 @@
 import org.apache.druid.query.aggregation.CountAggregatorFactory;
 import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
 import org.apache.druid.segment.IndexSpec;
-import org.apache.druid.segment.data.CompressionFactory;
-import org.apache.druid.segment.data.CompressionStrategy;
-import org.apache.druid.segment.data.RoaringBitmapSerdeFactory;
 import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.segment.indexing.RealtimeIOConfig;
 import org.apache.druid.segment.indexing.RealtimeTuningConfig;
@@ -52,7 +49,6 @@
 import org.apache.druid.segment.realtime.firehose.LocalFirehoseFactory;
 import org.apache.druid.segment.realtime.plumber.Plumber;
 import org.apache.druid.segment.realtime.plumber.PlumberSchool;
-import org.apache.druid.segment.writeout.TmpFileSegmentWriteOutMediumFactory;
 import org.apache.druid.server.security.AuthTestUtils;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.partition.NoneShardSpec;
@@ -64,7 +60,6 @@
 import org.junit.rules.ExpectedException;
 
 import java.io.File;
-import java.io.IOException;
 import java.util.List;
 
 public class TaskSerdeTest
@@ -457,61 +452,6 @@ public void testKillTaskSerde() throws Exception
     Assert.assertEquals(Intervals.of("2010-01-01/P1D"), task3.getInterval());
   }
 
-  @Test
-  public void testVersionConverterTaskSerde() throws Exception
-  {
-    final ConvertSegmentTask task = ConvertSegmentTask.create(
-        
DataSegment.builder().dataSource("foo").interval(Intervals.of("2010-01-01/P1D")).version("1234").build(),
-        null,
-        false,
-        true,
-        TmpFileSegmentWriteOutMediumFactory.instance(),
-        null
-    );
-
-    final String json = jsonMapper.writeValueAsString(task);
-
-    Thread.sleep(100); // Just want to run the clock a bit to make sure the 
task id doesn't change
-    final ConvertSegmentTask task2 = (ConvertSegmentTask) 
jsonMapper.readValue(json, Task.class);
-
-    Assert.assertEquals("foo", task.getDataSource());
-    Assert.assertEquals(Intervals.of("2010-01-01/P1D"), task.getInterval());
-
-    Assert.assertEquals(task.getId(), task2.getId());
-    Assert.assertEquals(task.getGroupId(), task2.getGroupId());
-    Assert.assertEquals(task.getDataSource(), task2.getDataSource());
-    Assert.assertEquals(task.getInterval(), task2.getInterval());
-    Assert.assertEquals(task.getSegment(), task2.getSegment());
-    Assert.assertEquals(task.getSegmentWriteOutMediumFactory(), 
task2.getSegmentWriteOutMediumFactory());
-  }
-
-  @Test
-  public void testVersionConverterSubTaskSerde() throws Exception
-  {
-    final ConvertSegmentTask.SubTask task = new ConvertSegmentTask.SubTask(
-        "myGroupId",
-        
DataSegment.builder().dataSource("foo").interval(Intervals.of("2010-01-01/P1D")).version("1234").build(),
-        indexSpec,
-        false,
-        true,
-        null,
-        null
-    );
-
-    final String json = jsonMapper.writeValueAsString(task);
-
-    Thread.sleep(100); // Just want to run the clock a bit to make sure the 
task id doesn't change
-    final ConvertSegmentTask.SubTask task2 = (ConvertSegmentTask.SubTask) 
jsonMapper.readValue(json, Task.class);
-
-    Assert.assertEquals("foo", task.getDataSource());
-    Assert.assertEquals("myGroupId", task.getGroupId());
-
-    Assert.assertEquals(task.getId(), task2.getId());
-    Assert.assertEquals(task.getGroupId(), task2.getGroupId());
-    Assert.assertEquals(task.getDataSource(), task2.getDataSource());
-    Assert.assertEquals(task.getSegment(), task2.getSegment());
-  }
-
   @Test
   public void testRealtimeIndexTaskSerde() throws Exception
   {
@@ -705,85 +645,6 @@ public void testRestoreTaskSerde() throws Exception
     Assert.assertEquals(task.getInterval(), task2.getInterval());
   }
 
-  @Test
-  public void testSegmentConvetSerdeReflection() throws IOException
-  {
-    final ConvertSegmentTask task = ConvertSegmentTask.create(
-        new DataSegment(
-            "dataSource",
-            Intervals.of("1990-01-01/1999-12-31"),
-            "version",
-            ImmutableMap.of(),
-            ImmutableList.of("dim1", "dim2"),
-            ImmutableList.of("metric1", "metric2"),
-            NoneShardSpec.instance(),
-            0,
-            12345L
-        ),
-        indexSpec,
-        false,
-        true,
-        TmpFileSegmentWriteOutMediumFactory.instance(),
-        null
-    );
-    final String json = jsonMapper.writeValueAsString(task);
-    final ConvertSegmentTask taskFromJson = jsonMapper.readValue(json, 
ConvertSegmentTask.class);
-    Assert.assertEquals(json, jsonMapper.writeValueAsString(taskFromJson));
-  }
-
-  @Test
-  public void testSegmentConvertSerde() throws IOException
-  {
-    final DataSegment segment = new DataSegment(
-        "dataSource",
-        Intervals.of("1990-01-01/1999-12-31"),
-        "version",
-        ImmutableMap.of(),
-        ImmutableList.of("dim1", "dim2"),
-        ImmutableList.of("metric1", "metric2"),
-        NoneShardSpec.instance(),
-        0,
-        12345L
-    );
-    final ConvertSegmentTask originalTask = ConvertSegmentTask.create(
-        segment,
-        new IndexSpec(
-            new RoaringBitmapSerdeFactory(null),
-            CompressionStrategy.LZF,
-            CompressionStrategy.UNCOMPRESSED,
-            CompressionFactory.LongEncodingStrategy.LONGS
-        ),
-        false,
-        true,
-        TmpFileSegmentWriteOutMediumFactory.instance(),
-        null
-    );
-    final String json = jsonMapper.writeValueAsString(originalTask);
-    final Task task = jsonMapper.readValue(json, Task.class);
-    Assert.assertTrue(task instanceof ConvertSegmentTask);
-    final ConvertSegmentTask convertSegmentTask = (ConvertSegmentTask) task;
-    Assert.assertEquals(originalTask.getDataSource(), 
convertSegmentTask.getDataSource());
-    Assert.assertEquals(originalTask.getInterval(), 
convertSegmentTask.getInterval());
-    Assert.assertEquals(
-        
originalTask.getIndexSpec().getBitmapSerdeFactory().getClass().getCanonicalName(),
-        convertSegmentTask.getIndexSpec()
-                          .getBitmapSerdeFactory()
-                          .getClass()
-                          .getCanonicalName()
-    );
-    Assert.assertEquals(
-        originalTask.getIndexSpec().getDimensionCompression(),
-        convertSegmentTask.getIndexSpec().getDimensionCompression()
-    );
-    Assert.assertEquals(
-        originalTask.getIndexSpec().getMetricCompression(),
-        convertSegmentTask.getIndexSpec().getMetricCompression()
-    );
-    Assert.assertEquals(false, convertSegmentTask.isForce());
-    Assert.assertEquals(segment, convertSegmentTask.getSegment());
-    Assert.assertEquals(originalTask.getSegmentWriteOutMediumFactory(), 
convertSegmentTask.getSegmentWriteOutMediumFactory());
-  }
-
   @Test
   public void testMoveTaskSerde() throws Exception
   {
diff --git a/processing/src/main/java/org/apache/druid/segment/IndexIO.java 
b/processing/src/main/java/org/apache/druid/segment/IndexIO.java
index 5088f9d23ba..6d74deb9bc0 100644
--- a/processing/src/main/java/org/apache/druid/segment/IndexIO.java
+++ b/processing/src/main/java/org/apache/druid/segment/IndexIO.java
@@ -64,7 +64,6 @@
 import org.apache.druid.segment.serde.FloatGenericColumnSupplier;
 import org.apache.druid.segment.serde.LongGenericColumnSupplier;
 import org.apache.druid.segment.serde.SpatialIndexColumnPartSupplier;
-import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
 import org.joda.time.Interval;
 
 import javax.annotation.Nullable;
@@ -95,18 +94,11 @@
   private static final SerializerUtils serializerUtils = new SerializerUtils();
 
   private final ObjectMapper mapper;
-  private final SegmentWriteOutMediumFactory 
defaultSegmentWriteOutMediumFactory;
 
   @Inject
-  public IndexIO(
-      ObjectMapper mapper,
-      SegmentWriteOutMediumFactory defaultSegmentWriteOutMediumFactory,
-      ColumnConfig columnConfig
-  )
+  public IndexIO(ObjectMapper mapper, ColumnConfig columnConfig)
   {
     this.mapper = Preconditions.checkNotNull(mapper, "null ObjectMapper");
-    this.defaultSegmentWriteOutMediumFactory =
-        Preconditions.checkNotNull(defaultSegmentWriteOutMediumFactory, "null 
SegmentWriteOutMediumFactory");
     Preconditions.checkNotNull(columnConfig, "null ColumnConfig");
     ImmutableMap.Builder<Integer, IndexLoader> indexLoadersBuilder = 
ImmutableMap.builder();
     LegacyIndexLoader legacyIndexLoader = new LegacyIndexLoader(new 
DefaultIndexIOHandler(), columnConfig);
@@ -199,21 +191,6 @@ public QueryableIndex loadIndex(File inDir) throws 
IOException
     }
   }
 
-  public static int getVersionFromDir(File inDir) throws IOException
-  {
-    File versionFile = new File(inDir, "version.bin");
-    if (versionFile.exists()) {
-      return Ints.fromByteArray(Files.toByteArray(versionFile));
-    }
-
-    final File indexFile = new File(inDir, "index.drd");
-    int version;
-    try (InputStream in = new FileInputStream(indexFile)) {
-      version = in.read();
-    }
-    return version;
-  }
-
   public static void checkFileSize(File indexFile) throws IOException
   {
     final long fileSize = indexFile.length();
@@ -222,32 +199,6 @@ public static void checkFileSize(File indexFile) throws 
IOException
     }
   }
 
-  public boolean convertSegment(
-      File toConvert,
-      File converted,
-      IndexSpec indexSpec,
-      boolean forceIfCurrent,
-      boolean validate,
-      @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory
-  ) throws IOException
-  {
-    final int version = SegmentUtils.getVersionFromDir(toConvert);
-    boolean current = version == CURRENT_VERSION_ID;
-    if (!current || forceIfCurrent) {
-      if (segmentWriteOutMediumFactory == null) {
-        segmentWriteOutMediumFactory = 
this.defaultSegmentWriteOutMediumFactory;
-      }
-      new IndexMergerV9(mapper, this, 
segmentWriteOutMediumFactory).convert(toConvert, converted, indexSpec);
-      if (validate) {
-        validateTwoSegments(toConvert, converted);
-      }
-      return true;
-    } else {
-      log.info("Current version[%d], skipping.", version);
-      return false;
-    }
-  }
-
   interface IndexIOHandler
   {
     MMappedIndex mapDir(File inDir) throws IOException;
diff --git 
a/processing/src/test/java/org/apache/druid/query/DoubleStorageTest.java 
b/processing/src/test/java/org/apache/druid/query/DoubleStorageTest.java
index 5e1283f1b6d..5d48bb2afa5 100644
--- a/processing/src/test/java/org/apache/druid/query/DoubleStorageTest.java
+++ b/processing/src/test/java/org/apache/druid/query/DoubleStorageTest.java
@@ -109,7 +109,7 @@
 
   private static final IndexMergerV9 INDEX_MERGER_V9 =
       
TestHelper.getTestIndexMergerV9(OffHeapMemorySegmentWriteOutMediumFactory.instance());
-  private static final IndexIO INDEX_IO = 
TestHelper.getTestIndexIO(OffHeapMemorySegmentWriteOutMediumFactory.instance());
+  private static final IndexIO INDEX_IO = TestHelper.getTestIndexIO();
   private static final Integer MAX_ROWS = 10;
   private static final String TIME_COLUMN = "__time";
   private static final String DIM_NAME = "testDimName";
diff --git 
a/processing/src/test/java/org/apache/druid/query/MultiValuedDimensionTest.java 
b/processing/src/test/java/org/apache/druid/query/MultiValuedDimensionTest.java
index f1987267854..afe1a2f6fca 100644
--- 
a/processing/src/test/java/org/apache/druid/query/MultiValuedDimensionTest.java
+++ 
b/processing/src/test/java/org/apache/druid/query/MultiValuedDimensionTest.java
@@ -145,7 +145,7 @@ public void setup() throws Exception
     TestHelper.getTestIndexMergerV9(segmentWriteOutMediumFactory)
               .persist(incrementalIndex, persistedSegmentDir, new IndexSpec(), 
null);
 
-    queryableIndex = 
TestHelper.getTestIndexIO(segmentWriteOutMediumFactory).loadIndex(persistedSegmentDir);
+    queryableIndex = 
TestHelper.getTestIndexIO().loadIndex(persistedSegmentDir);
   }
 
   @After
diff --git 
a/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java
 
b/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java
index 5855bb98a74..340212b79cd 100644
--- 
a/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java
+++ 
b/processing/src/test/java/org/apache/druid/query/aggregation/AggregationTestHelper.java
@@ -34,6 +34,8 @@
 import com.google.common.collect.Maps;
 import com.google.common.io.Closeables;
 import com.google.common.util.concurrent.MoreExecutors;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.io.LineIterator;
 import org.apache.druid.collections.CloseableStupidPool;
 import org.apache.druid.data.input.InputRow;
 import org.apache.druid.data.input.Row;
@@ -81,8 +83,6 @@
 import org.apache.druid.segment.incremental.IncrementalIndex;
 import org.apache.druid.segment.incremental.IncrementalIndexSchema;
 import 
org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.io.LineIterator;
 import org.junit.rules.TemporaryFolder;
 
 import java.io.Closeable;
@@ -155,7 +155,6 @@ public static AggregationTestHelper 
createGroupByQueryAggregationTestHelper(
 
     IndexIO indexIO = new IndexIO(
         mapper,
-        OffHeapMemorySegmentWriteOutMediumFactory.instance(),
         new ColumnConfig()
         {
           @Override
@@ -212,7 +211,6 @@ public static AggregationTestHelper 
createSelectQueryAggregationTestHelper(
 
     IndexIO indexIO = new IndexIO(
         mapper,
-        OffHeapMemorySegmentWriteOutMediumFactory.instance(),
         new ColumnConfig()
         {
           @Override
@@ -254,7 +252,6 @@ public static AggregationTestHelper 
createTimeseriesQueryAggregationTestHelper(
 
     IndexIO indexIO = new IndexIO(
         mapper,
-        OffHeapMemorySegmentWriteOutMediumFactory.instance(),
         new ColumnConfig()
         {
           @Override
@@ -309,7 +306,6 @@ public ByteBuffer get()
 
     IndexIO indexIO = new IndexIO(
         mapper,
-        OffHeapMemorySegmentWriteOutMediumFactory.instance(),
         new ColumnConfig()
         {
           @Override
diff --git 
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownInsufficientBufferTest.java
 
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownInsufficientBufferTest.java
index 99a8b646a66..94224a7fef5 100644
--- 
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownInsufficientBufferTest.java
+++ 
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownInsufficientBufferTest.java
@@ -30,6 +30,7 @@
 import com.google.common.collect.Maps;
 import com.google.common.io.Files;
 import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.commons.io.FileUtils;
 import org.apache.druid.collections.CloseableDefaultBlockingPool;
 import org.apache.druid.collections.CloseableStupidPool;
 import org.apache.druid.data.input.InputRow;
@@ -77,7 +78,6 @@
 import org.apache.druid.segment.incremental.IncrementalIndex;
 import org.apache.druid.segment.incremental.IncrementalIndexSchema;
 import 
org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
-import org.apache.commons.io.FileUtils;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -119,7 +119,6 @@
     );
     INDEX_IO = new IndexIO(
         JSON_MAPPER,
-        OffHeapMemorySegmentWriteOutMediumFactory.instance(),
         new ColumnConfig()
         {
           @Override
diff --git 
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java
 
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java
index 6b12b2d0541..8e615ae0f97 100644
--- 
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java
+++ 
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByLimitPushDownMultiNodeMergeTest.java
@@ -30,6 +30,7 @@
 import com.google.common.collect.Maps;
 import com.google.common.io.Files;
 import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.commons.io.FileUtils;
 import org.apache.druid.collections.CloseableDefaultBlockingPool;
 import org.apache.druid.collections.CloseableStupidPool;
 import org.apache.druid.data.input.InputRow;
@@ -85,7 +86,6 @@
 import org.apache.druid.segment.incremental.IncrementalIndexSchema;
 import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
 import 
org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
-import org.apache.commons.io.FileUtils;
 import org.joda.time.DateTimeZone;
 import org.joda.time.Period;
 import org.junit.After;
@@ -129,7 +129,6 @@
     );
     INDEX_IO = new IndexIO(
         JSON_MAPPER,
-        OffHeapMemorySegmentWriteOutMediumFactory.instance(),
         new ColumnConfig()
         {
           @Override
diff --git 
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByMultiSegmentTest.java
 
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByMultiSegmentTest.java
index 708fb95aca8..5c31833c03f 100644
--- 
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByMultiSegmentTest.java
+++ 
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByMultiSegmentTest.java
@@ -28,6 +28,7 @@
 import com.google.common.collect.Maps;
 import com.google.common.io.Files;
 import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.commons.io.FileUtils;
 import org.apache.druid.collections.CloseableDefaultBlockingPool;
 import org.apache.druid.collections.CloseableStupidPool;
 import org.apache.druid.data.input.InputRow;
@@ -74,7 +75,6 @@
 import org.apache.druid.segment.incremental.IncrementalIndex;
 import org.apache.druid.segment.incremental.IncrementalIndexSchema;
 import 
org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
-import org.apache.commons.io.FileUtils;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -114,7 +114,6 @@
     );
     INDEX_IO = new IndexIO(
         JSON_MAPPER,
-        OffHeapMemorySegmentWriteOutMediumFactory.instance(),
         new ColumnConfig()
         {
           @Override
diff --git 
a/processing/src/test/java/org/apache/druid/segment/ConciseBitmapIndexMergerV9Test.java
 
b/processing/src/test/java/org/apache/druid/segment/ConciseBitmapIndexMergerV9Test.java
index b2afbb378e9..e0dc642f423 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/ConciseBitmapIndexMergerV9Test.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/ConciseBitmapIndexMergerV9Test.java
@@ -40,8 +40,7 @@ public ConciseBitmapIndexMergerV9Test(
         new ConciseBitmapSerdeFactory(),
         compressionStrategy,
         dimCompressionStrategy,
-        longEncodingStrategy,
-        segmentWriteOutMediumFactory
+        longEncodingStrategy
     );
     indexMerger = 
TestHelper.getTestIndexMergerV9(segmentWriteOutMediumFactory);
   }
diff --git 
a/processing/src/test/java/org/apache/druid/segment/EmptyIndexTest.java 
b/processing/src/test/java/org/apache/druid/segment/EmptyIndexTest.java
index 42d8d34d522..d63d3a382b2 100644
--- a/processing/src/test/java/org/apache/druid/segment/EmptyIndexTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/EmptyIndexTest.java
@@ -90,7 +90,7 @@ public void testEmptyIndex() throws Exception
           new IndexSpec()
       );
 
-      QueryableIndex emptyQueryableIndex = 
TestHelper.getTestIndexIO(segmentWriteOutMediumFactory).loadIndex(tmpDir);
+      QueryableIndex emptyQueryableIndex = 
TestHelper.getTestIndexIO().loadIndex(tmpDir);
 
       Assert.assertEquals("getDimensionNames", 0, 
Iterables.size(emptyQueryableIndex.getAvailableDimensions()));
       Assert.assertEquals("getMetricNames", 0, 
emptyQueryableIndex.getColumnNames().size());
diff --git 
a/processing/src/test/java/org/apache/druid/segment/IndexBuilder.java 
b/processing/src/test/java/org/apache/druid/segment/IndexBuilder.java
index 26b89a6c3e7..cbf5a4a8baf 100644
--- a/processing/src/test/java/org/apache/druid/segment/IndexBuilder.java
+++ b/processing/src/test/java/org/apache/druid/segment/IndexBuilder.java
@@ -112,7 +112,7 @@ public QueryableIndex buildMMappedIndex()
     Preconditions.checkNotNull(indexMerger, "indexMerger");
     Preconditions.checkNotNull(tmpDir, "tmpDir");
     try (final IncrementalIndex incrementalIndex = buildIncrementalIndex()) {
-      return TestHelper.getTestIndexIO(segmentWriteOutMediumFactory).loadIndex(
+      return TestHelper.getTestIndexIO().loadIndex(
           indexMerger.persist(
               incrementalIndex,
               new File(tmpDir, StringUtils.format("testIndex-%s", 
ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE))),
@@ -135,7 +135,7 @@ public QueryableIndex buildMMappedMergedIndex()
     try {
       for (int i = 0; i < rows.size(); i += ROWS_PER_INDEX_FOR_MERGING) {
         persisted.add(
-            TestHelper.getTestIndexIO(segmentWriteOutMediumFactory).loadIndex(
+            TestHelper.getTestIndexIO().loadIndex(
                 indexMerger.persist(
                     buildIncrementalIndexWithRows(
                         schema,
@@ -149,7 +149,7 @@ public QueryableIndex buildMMappedMergedIndex()
             )
         );
       }
-      final QueryableIndex merged = 
TestHelper.getTestIndexIO(segmentWriteOutMediumFactory).loadIndex(
+      final QueryableIndex merged = TestHelper.getTestIndexIO().loadIndex(
           indexMerger.merge(
               Lists.transform(
                   persisted,
diff --git a/processing/src/test/java/org/apache/druid/segment/IndexIOTest.java 
b/processing/src/test/java/org/apache/druid/segment/IndexIOTest.java
index 356f9fe1728..fb07f27de67 100644
--- a/processing/src/test/java/org/apache/druid/segment/IndexIOTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/IndexIOTest.java
@@ -31,7 +31,6 @@
 import org.apache.druid.data.input.impl.DimensionsSpec;
 import org.apache.druid.java.util.common.Intervals;
 import org.apache.druid.java.util.common.UOE;
-import 
org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
 import org.apache.druid.query.aggregation.Aggregator;
 import org.apache.druid.query.aggregation.CountAggregatorFactory;
 import org.apache.druid.segment.data.CompressionFactory;
@@ -329,7 +328,7 @@ public void testRowValidatorEquals() throws Exception
   {
     Exception ex = null;
     try {
-      
TestHelper.getTestIndexIO(OffHeapMemorySegmentWriteOutMediumFactory.instance()).validateTwoSegments(adapter1,
 adapter2);
+      TestHelper.getTestIndexIO().validateTwoSegments(adapter1, adapter2);
     }
     catch (Exception e) {
       ex = e;
diff --git 
a/processing/src/test/java/org/apache/druid/segment/IndexMergerNullHandlingTest.java
 
b/processing/src/test/java/org/apache/druid/segment/IndexMergerNullHandlingTest.java
index fda318a960c..8db4364e0b6 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/IndexMergerNullHandlingTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/IndexMergerNullHandlingTest.java
@@ -66,7 +66,7 @@
   public void setUp()
   {
     indexMerger = 
TestHelper.getTestIndexMergerV9(OffHeapMemorySegmentWriteOutMediumFactory.instance());
-    indexIO = 
TestHelper.getTestIndexIO(OffHeapMemorySegmentWriteOutMediumFactory.instance());
+    indexIO = TestHelper.getTestIndexIO();
     indexSpec = new IndexSpec();
   }
 
diff --git 
a/processing/src/test/java/org/apache/druid/segment/IndexMergerTestBase.java 
b/processing/src/test/java/org/apache/druid/segment/IndexMergerTestBase.java
index f6178958205..f29035e6bb9 100644
--- a/processing/src/test/java/org/apache/druid/segment/IndexMergerTestBase.java
+++ b/processing/src/test/java/org/apache/druid/segment/IndexMergerTestBase.java
@@ -142,8 +142,7 @@ protected IndexMergerTestBase(
       @Nullable BitmapSerdeFactory bitmapSerdeFactory,
       CompressionStrategy compressionStrategy,
       CompressionStrategy dimCompressionStrategy,
-      CompressionFactory.LongEncodingStrategy longEncodingStrategy,
-      SegmentWriteOutMediumFactory segmentWriteOutMediumFactory
+      CompressionFactory.LongEncodingStrategy longEncodingStrategy
   )
   {
     this.indexSpec = makeIndexSpec(
@@ -152,7 +151,7 @@ protected IndexMergerTestBase(
         dimCompressionStrategy,
         longEncodingStrategy
     );
-    this.indexIO = TestHelper.getTestIndexIO(segmentWriteOutMediumFactory);
+    this.indexIO = TestHelper.getTestIndexIO();
     this.useBitmapIndexes = bitmapSerdeFactory != null;
   }
 
diff --git 
a/processing/src/test/java/org/apache/druid/segment/IndexMergerV9CompatibilityTest.java
 
b/processing/src/test/java/org/apache/druid/segment/IndexMergerV9CompatibilityTest.java
index 75e5de2fada..bb5118d9c4d 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/IndexMergerV9CompatibilityTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/IndexMergerV9CompatibilityTest.java
@@ -93,7 +93,7 @@
   public IndexMergerV9CompatibilityTest(SegmentWriteOutMediumFactory 
segmentWriteOutMediumFactory)
   {
     indexMerger = 
TestHelper.getTestIndexMergerV9(segmentWriteOutMediumFactory);
-    indexIO = TestHelper.getTestIndexIO(segmentWriteOutMediumFactory);
+    indexIO = TestHelper.getTestIndexIO();
     events = new ArrayList<>();
 
     final Map<String, Object> map1 = ImmutableMap.of(
diff --git 
a/processing/src/test/java/org/apache/druid/segment/IndexMergerV9WithSpatialIndexTest.java
 
b/processing/src/test/java/org/apache/druid/segment/IndexMergerV9WithSpatialIndexTest.java
index bd5d144735e..dfc2dd7b7b2 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/IndexMergerV9WithSpatialIndexTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/IndexMergerV9WithSpatialIndexTest.java
@@ -87,7 +87,7 @@
     List<Object[]> argumentArrays = new ArrayList<>();
     for (SegmentWriteOutMediumFactory segmentWriteOutMediumFactory : 
SegmentWriteOutMediumFactory.builtInFactories()) {
       IndexMergerV9 indexMergerV9 = 
TestHelper.getTestIndexMergerV9(segmentWriteOutMediumFactory);
-      IndexIO indexIO = 
TestHelper.getTestIndexIO(segmentWriteOutMediumFactory);
+      IndexIO indexIO = TestHelper.getTestIndexIO();
 
       final IndexSpec indexSpec = new IndexSpec();
       final IncrementalIndex rtIndex = makeIncrementalIndex();
diff --git 
a/processing/src/test/java/org/apache/druid/segment/NoBitmapIndexMergerV9Test.java
 
b/processing/src/test/java/org/apache/druid/segment/NoBitmapIndexMergerV9Test.java
index df1be235440..8af1a701fd2 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/NoBitmapIndexMergerV9Test.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/NoBitmapIndexMergerV9Test.java
@@ -39,8 +39,7 @@ public NoBitmapIndexMergerV9Test(
         null,
         compressionStrategy,
         dimCompressionStrategy,
-        longEncodingStrategy,
-        segmentWriteOutMediumFactory
+        longEncodingStrategy
     );
     indexMerger = 
TestHelper.getTestIndexMergerV9(segmentWriteOutMediumFactory);
   }
diff --git 
a/processing/src/test/java/org/apache/druid/segment/QueryableIndexIndexableAdapterTest.java
 
b/processing/src/test/java/org/apache/druid/segment/QueryableIndexIndexableAdapterTest.java
index f333d39809d..ccb107080da 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/QueryableIndexIndexableAdapterTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/QueryableIndexIndexableAdapterTest.java
@@ -70,7 +70,7 @@
   public QueryableIndexIndexableAdapterTest(SegmentWriteOutMediumFactory 
segmentWriteOutMediumFactory)
   {
     indexMerger = 
TestHelper.getTestIndexMergerV9(segmentWriteOutMediumFactory);
-    indexIO = TestHelper.getTestIndexIO(segmentWriteOutMediumFactory);
+    indexIO = TestHelper.getTestIndexIO();
   }
 
   @Test
diff --git 
a/processing/src/test/java/org/apache/druid/segment/RoaringBitmapIndexMergerV9Test.java
 
b/processing/src/test/java/org/apache/druid/segment/RoaringBitmapIndexMergerV9Test.java
index 5336d089749..0b88e979093 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/RoaringBitmapIndexMergerV9Test.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/RoaringBitmapIndexMergerV9Test.java
@@ -40,8 +40,7 @@ public RoaringBitmapIndexMergerV9Test(
         new RoaringBitmapSerdeFactory(null),
         compressionStrategy,
         dimCompressionStrategy,
-        longEncodingStrategy,
-        segmentWriteOutMediumFactory
+        longEncodingStrategy
     );
     indexMerger = 
TestHelper.getTestIndexMergerV9(segmentWriteOutMediumFactory);
   }
diff --git 
a/processing/src/test/java/org/apache/druid/segment/SchemalessIndexTest.java 
b/processing/src/test/java/org/apache/druid/segment/SchemalessIndexTest.java
index 48920262393..a5c8db9852b 100644
--- a/processing/src/test/java/org/apache/druid/segment/SchemalessIndexTest.java
+++ b/processing/src/test/java/org/apache/druid/segment/SchemalessIndexTest.java
@@ -105,7 +105,7 @@
   public SchemalessIndexTest(SegmentWriteOutMediumFactory 
segmentWriteOutMediumFactory)
   {
     indexMerger = 
TestHelper.getTestIndexMergerV9(segmentWriteOutMediumFactory);
-    indexIO = TestHelper.getTestIndexIO(segmentWriteOutMediumFactory);
+    indexIO = TestHelper.getTestIndexIO();
   }
 
   public static IncrementalIndex getIncrementalIndex()
diff --git a/processing/src/test/java/org/apache/druid/segment/TestHelper.java 
b/processing/src/test/java/org/apache/druid/segment/TestHelper.java
index 78554660e25..c02e99e8055 100644
--- a/processing/src/test/java/org/apache/druid/segment/TestHelper.java
+++ b/processing/src/test/java/org/apache/druid/segment/TestHelper.java
@@ -53,14 +53,13 @@
 
   public static IndexMergerV9 
getTestIndexMergerV9(SegmentWriteOutMediumFactory segmentWriteOutMediumFactory)
   {
-    return new IndexMergerV9(JSON_MAPPER, 
getTestIndexIO(segmentWriteOutMediumFactory), segmentWriteOutMediumFactory);
+    return new IndexMergerV9(JSON_MAPPER, getTestIndexIO(), 
segmentWriteOutMediumFactory);
   }
 
-  public static IndexIO getTestIndexIO(SegmentWriteOutMediumFactory 
segmentWriteOutMediumFactory)
+  public static IndexIO getTestIndexIO()
   {
     return new IndexIO(
         JSON_MAPPER,
-        segmentWriteOutMediumFactory,
         new ColumnConfig()
         {
           @Override
diff --git a/processing/src/test/java/org/apache/druid/segment/TestIndex.java 
b/processing/src/test/java/org/apache/druid/segment/TestIndex.java
index 201354a430b..16e6c5dce3b 100644
--- a/processing/src/test/java/org/apache/druid/segment/TestIndex.java
+++ b/processing/src/test/java/org/apache/druid/segment/TestIndex.java
@@ -138,7 +138,7 @@
 
   private static final IndexMerger INDEX_MERGER =
       
TestHelper.getTestIndexMergerV9(OffHeapMemorySegmentWriteOutMediumFactory.instance());
-  private static final IndexIO INDEX_IO = 
TestHelper.getTestIndexIO(OffHeapMemorySegmentWriteOutMediumFactory.instance());
+  private static final IndexIO INDEX_IO = TestHelper.getTestIndexIO();
 
   static {
     if (ComplexMetrics.getSerdeForType("hyperUnique") == null) {
diff --git 
a/processing/src/test/java/org/apache/druid/segment/filter/SpatialFilterBonusTest.java
 
b/processing/src/test/java/org/apache/druid/segment/filter/SpatialFilterBonusTest.java
index 88840a4494b..120ad1e7c11 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/filter/SpatialFilterBonusTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/filter/SpatialFilterBonusTest.java
@@ -102,7 +102,7 @@ public SpatialFilterBonusTest(Segment segment)
     List<Object[]> argumentArrays = new ArrayList<>();
     for (SegmentWriteOutMediumFactory segmentWriteOutMediumFactory : 
SegmentWriteOutMediumFactory.builtInFactories()) {
       IndexMerger indexMerger = 
TestHelper.getTestIndexMergerV9(segmentWriteOutMediumFactory);
-      IndexIO indexIO = 
TestHelper.getTestIndexIO(segmentWriteOutMediumFactory);
+      IndexIO indexIO = TestHelper.getTestIndexIO();
       final IndexSpec indexSpec = new IndexSpec();
       final IncrementalIndex rtIndex = makeIncrementalIndex();
       final QueryableIndex mMappedTestIndex = makeQueryableIndex(indexSpec, 
indexMerger, indexIO);
diff --git 
a/processing/src/test/java/org/apache/druid/segment/filter/SpatialFilterTest.java
 
b/processing/src/test/java/org/apache/druid/segment/filter/SpatialFilterTest.java
index 14bbe8974cb..143d3851f00 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/filter/SpatialFilterTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/filter/SpatialFilterTest.java
@@ -77,7 +77,7 @@
 public class SpatialFilterTest
 {
   private static IndexMerger INDEX_MERGER = 
TestHelper.getTestIndexMergerV9(OffHeapMemorySegmentWriteOutMediumFactory.instance());
-  private static IndexIO INDEX_IO = 
TestHelper.getTestIndexIO(OffHeapMemorySegmentWriteOutMediumFactory.instance());
+  private static IndexIO INDEX_IO = TestHelper.getTestIndexIO();
 
   public static final int NUM_POINTS = 5000;
   private static Interval DATA_INTERVAL = 
Intervals.of("2013-01-01/2013-01-07");
diff --git 
a/processing/src/test/java/org/apache/druid/segment/loading/SegmentizerFactoryTest.java
 
b/processing/src/test/java/org/apache/druid/segment/loading/SegmentizerFactoryTest.java
index 105e47ff557..9873ebb188d 100644
--- 
a/processing/src/test/java/org/apache/druid/segment/loading/SegmentizerFactoryTest.java
+++ 
b/processing/src/test/java/org/apache/druid/segment/loading/SegmentizerFactoryTest.java
@@ -23,7 +23,6 @@
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.jackson.SegmentizerModule;
-import 
org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
 import org.apache.druid.segment.IndexIO;
 import org.apache.druid.segment.column.ColumnConfig;
 import org.junit.Assert;
@@ -45,7 +44,6 @@ public void testFactory() throws IOException
     mapper.registerModule(new SegmentizerModule());
     IndexIO indexIO = new IndexIO(
         mapper,
-        OffHeapMemorySegmentWriteOutMediumFactory.instance(),
         new ColumnConfig()
         {
           @Override
diff --git 
a/server/src/main/java/org/apache/druid/client/indexing/ClientConversionQuery.java
 
b/server/src/main/java/org/apache/druid/client/indexing/ClientConversionQuery.java
deleted file mode 100644
index b38bc7d6d83..00000000000
--- 
a/server/src/main/java/org/apache/druid/client/indexing/ClientConversionQuery.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.client.indexing;
-
-import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.druid.timeline.DataSegment;
-import org.joda.time.Interval;
-
-/**
- */
-public class ClientConversionQuery
-{
-  private final String dataSource;
-  private final Interval interval;
-  private final DataSegment segment;
-
-  public ClientConversionQuery(
-      DataSegment segment
-  )
-  {
-    this.dataSource = segment.getDataSource();
-    this.interval = segment.getInterval();
-    this.segment = segment;
-  }
-
-  public ClientConversionQuery(
-      String dataSource,
-      Interval interval
-  )
-  {
-    this.dataSource = dataSource;
-    this.interval = interval;
-    this.segment = null;
-  }
-
-  @JsonProperty
-  public String getType()
-  {
-    return "version_converter";
-  }
-
-  @JsonProperty
-  public String getDataSource()
-  {
-    return dataSource;
-  }
-
-  @JsonProperty
-  public Interval getInterval()
-  {
-    return interval;
-  }
-
-  @JsonProperty
-  public DataSegment getSegment()
-  {
-    return segment;
-  }
-}
diff --git 
a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
 
b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
index a3cf5c608c9..d6e9cb61a3d 100644
--- 
a/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
+++ 
b/server/src/main/java/org/apache/druid/client/indexing/HttpIndexingServiceClient.java
@@ -87,12 +87,6 @@ public void killSegments(String dataSource, Interval 
interval)
     runTask(new ClientKillQuery(dataSource, interval));
   }
 
-  @Override
-  public void upgradeSegment(DataSegment dataSegment)
-  {
-    runTask(new ClientConversionQuery(dataSegment));
-  }
-
   @Override
   public String compactSegments(
       List<DataSegment> segments,
diff --git 
a/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java
 
b/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java
index a15cd54af4d..19827ce9f79 100644
--- 
a/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java
+++ 
b/server/src/main/java/org/apache/druid/client/indexing/IndexingServiceClient.java
@@ -34,8 +34,6 @@
 
   int killPendingSegments(String dataSource, DateTime end);
 
-  void upgradeSegment(DataSegment dataSegment);
-
   void mergeSegments(List<DataSegment> segments);
 
   String compactSegments(
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorVersionConverter.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorVersionConverter.java
deleted file mode 100644
index 5ef69344810..00000000000
--- 
a/server/src/main/java/org/apache/druid/server/coordinator/helper/DruidCoordinatorVersionConverter.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.server.coordinator.helper;
-
-import com.google.inject.Inject;
-import org.apache.druid.java.util.emitter.EmittingLogger;
-import org.apache.druid.client.indexing.IndexingServiceClient;
-import org.apache.druid.common.config.JacksonConfigManager;
-import org.apache.druid.segment.IndexIO;
-import org.apache.druid.server.coordinator.DatasourceWhitelist;
-import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
-import org.apache.druid.timeline.DataSegment;
-
-import java.util.concurrent.atomic.AtomicReference;
-
-public class DruidCoordinatorVersionConverter implements DruidCoordinatorHelper
-{
-  private static final EmittingLogger log = new 
EmittingLogger(DruidCoordinatorVersionConverter.class);
-
-  private final IndexingServiceClient indexingServiceClient;
-  private final AtomicReference<DatasourceWhitelist> whitelistRef;
-
-  @Inject
-  public DruidCoordinatorVersionConverter(
-      IndexingServiceClient indexingServiceClient,
-      JacksonConfigManager configManager
-  )
-  {
-    this.indexingServiceClient = indexingServiceClient;
-    this.whitelistRef = configManager.watch(DatasourceWhitelist.CONFIG_KEY, 
DatasourceWhitelist.class);
-  }
-
-  @Override
-  public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams 
params)
-  {
-    DatasourceWhitelist whitelist = whitelistRef.get();
-
-    for (DataSegment dataSegment : params.getAvailableSegments()) {
-      if (whitelist == null || 
whitelist.contains(dataSegment.getDataSource())) {
-        final Integer binaryVersion = dataSegment.getBinaryVersion();
-
-        if (binaryVersion == null || binaryVersion < 
IndexIO.CURRENT_VERSION_ID) {
-          log.info("Upgrading version on segment[%s]", 
dataSegment.getIdentifier());
-          indexingServiceClient.upgradeSegment(dataSegment);
-        }
-      }
-    }
-
-    return params;
-  }
-}
diff --git 
a/server/src/test/java/org/apache/druid/client/indexing/ClientConversionQueryTest.java
 
b/server/src/test/java/org/apache/druid/client/indexing/ClientConversionQueryTest.java
deleted file mode 100644
index c64c7632671..00000000000
--- 
a/server/src/test/java/org/apache/druid/client/indexing/ClientConversionQueryTest.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.druid.client.indexing;
-
-import org.apache.druid.java.util.common.DateTimes;
-import org.apache.druid.timeline.DataSegment;
-import org.joda.time.DateTime;
-import org.joda.time.Interval;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class ClientConversionQueryTest
-{
-  private ClientConversionQuery clientConversionQuery;
-  private static final String DATA_SOURCE = "data_source";
-  public static final DateTime START = DateTimes.nowUtc();
-  private static final Interval INTERVAL = new Interval(START, START.plus(1));
-  private static final DataSegment DATA_SEGMENT =
-      new DataSegment(DATA_SOURCE, INTERVAL, START.toString(), null, null, 
null, null, 0, 0);
-
-  @Test
-  public void testGetType()
-  {
-    clientConversionQuery = new ClientConversionQuery(DATA_SEGMENT);
-    Assert.assertEquals("version_converter", clientConversionQuery.getType());
-  }
-
-  @Test
-  public void testGetDataSource()
-  {
-    clientConversionQuery = new ClientConversionQuery(DATA_SEGMENT);
-    Assert.assertEquals(DATA_SOURCE, clientConversionQuery.getDataSource());
-
-  }
-
-  @Test
-  public void testGetInterval()
-  {
-    clientConversionQuery = new ClientConversionQuery(DATA_SEGMENT);
-    Assert.assertEquals(INTERVAL, clientConversionQuery.getInterval());
-  }
-
-  @Test
-  public void testGetSegment()
-  {
-    clientConversionQuery = new ClientConversionQuery(DATA_SEGMENT);
-    Assert.assertEquals(DATA_SEGMENT, clientConversionQuery.getSegment());
-    clientConversionQuery = new ClientConversionQuery(DATA_SOURCE, INTERVAL);
-    Assert.assertNull(clientConversionQuery.getSegment());
-  }
-}
diff --git 
a/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java
 
b/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java
index 664daf282d3..dcce4e82d57 100644
--- 
a/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java
+++ 
b/server/src/test/java/org/apache/druid/client/indexing/NoopIndexingServiceClient.java
@@ -42,12 +42,6 @@ public int killPendingSegments(String dataSource, DateTime 
end)
     return 0;
   }
 
-  @Override
-  public void upgradeSegment(DataSegment dataSegment)
-  {
-
-  }
-
   @Override
   public void mergeSegments(List<DataSegment> segments)
   {
diff --git 
a/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerTest.java
 
b/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerTest.java
index c44b0d369e6..844eba701e1 100644
--- 
a/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerTest.java
+++ 
b/server/src/test/java/org/apache/druid/segment/loading/SegmentLoaderLocalCacheManagerTest.java
@@ -94,7 +94,7 @@ public void setUp() throws Exception
     locations.add(locationConfig);
 
     manager = new SegmentLoaderLocalCacheManager(
-        TestHelper.getTestIndexIO(segmentWriteOutMediumFactory),
+        TestHelper.getTestIndexIO(),
         new SegmentLoaderConfig().withLocations(locations),
         jsonMapper
     );
@@ -168,7 +168,7 @@ public void testRetrySuccessAtFirstLocation() throws 
Exception
     locations.add(locationConfig2);
 
     manager = new SegmentLoaderLocalCacheManager(
-        TestHelper.getTestIndexIO(segmentWriteOutMediumFactory),
+        TestHelper.getTestIndexIO(),
         new SegmentLoaderConfig().withLocations(locations),
         jsonMapper
     );
@@ -221,7 +221,7 @@ public void testRetrySuccessAtSecondLocation() throws 
Exception
     locations.add(locationConfig2);
 
     manager = new SegmentLoaderLocalCacheManager(
-        TestHelper.getTestIndexIO(segmentWriteOutMediumFactory),
+        TestHelper.getTestIndexIO(),
         new SegmentLoaderConfig().withLocations(locations),
         jsonMapper
     );
@@ -276,7 +276,7 @@ public void testRetryAllFail() throws Exception
     locations.add(locationConfig2);
 
     manager = new SegmentLoaderLocalCacheManager(
-        TestHelper.getTestIndexIO(segmentWriteOutMediumFactory),
+        TestHelper.getTestIndexIO(),
         new SegmentLoaderConfig().withLocations(locations),
         jsonMapper
     );
@@ -330,7 +330,7 @@ public void testEmptyToFullOrder() throws Exception
     locations.add(locationConfig2);
 
     manager = new SegmentLoaderLocalCacheManager(
-        TestHelper.getTestIndexIO(segmentWriteOutMediumFactory),
+        TestHelper.getTestIndexIO(),
         new SegmentLoaderConfig().withLocations(locations),
         jsonMapper
     );
diff --git 
a/server/src/test/java/org/apache/druid/segment/realtime/FireDepartmentTest.java
 
b/server/src/test/java/org/apache/druid/segment/realtime/FireDepartmentTest.java
index d7162d1186e..9c373bf97da 100644
--- 
a/server/src/test/java/org/apache/druid/segment/realtime/FireDepartmentTest.java
+++ 
b/server/src/test/java/org/apache/druid/segment/realtime/FireDepartmentTest.java
@@ -30,7 +30,6 @@
 import org.apache.druid.data.input.impl.TimestampSpec;
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.granularity.Granularities;
-import 
org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
 import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.query.aggregation.CountAggregatorFactory;
 import org.apache.druid.segment.TestHelper;
@@ -39,6 +38,7 @@
 import org.apache.druid.segment.indexing.RealtimeTuningConfig;
 import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec;
 import org.apache.druid.segment.realtime.plumber.RealtimePlumberSchool;
+import 
org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -113,7 +113,7 @@ public void testSerde() throws Exception
                 null,
                 null,
                 
TestHelper.getTestIndexMergerV9(OffHeapMemorySegmentWriteOutMediumFactory.instance()),
-                
TestHelper.getTestIndexIO(OffHeapMemorySegmentWriteOutMediumFactory.instance()),
+                TestHelper.getTestIndexIO(),
                 MapCache.create(0),
                 NO_CACHE_CONFIG,
                 new CachePopulatorStats(),
diff --git 
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTester.java
 
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTester.java
index 9c1dfdefbcf..3b6a9d92c47 100644
--- 
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTester.java
+++ 
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/AppenderatorTester.java
@@ -21,6 +21,7 @@
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableMap;
+import org.apache.commons.io.FileUtils;
 import org.apache.druid.client.cache.CacheConfig;
 import org.apache.druid.client.cache.CachePopulatorStats;
 import org.apache.druid.client.cache.MapCache;
@@ -57,7 +58,6 @@
 import org.apache.druid.server.coordination.DataSegmentAnnouncer;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.partition.LinearShardSpec;
-import org.apache.commons.io.FileUtils;
 
 import java.io.File;
 import java.io.IOException;
@@ -167,7 +167,6 @@ public AppenderatorTester(
 
     indexIO = new IndexIO(
         objectMapper,
-        OffHeapMemorySegmentWriteOutMediumFactory.instance(),
         new ColumnConfig()
         {
           @Override
diff --git 
a/server/src/test/java/org/apache/druid/segment/realtime/firehose/IngestSegmentFirehoseTest.java
 
b/server/src/test/java/org/apache/druid/segment/realtime/firehose/IngestSegmentFirehoseTest.java
index 1ce56627c55..d6c51a91c52 100644
--- 
a/server/src/test/java/org/apache/druid/segment/realtime/firehose/IngestSegmentFirehoseTest.java
+++ 
b/server/src/test/java/org/apache/druid/segment/realtime/firehose/IngestSegmentFirehoseTest.java
@@ -112,7 +112,7 @@
 
   public IngestSegmentFirehoseTest(SegmentWriteOutMediumFactory 
segmentWriteOutMediumFactory)
   {
-    indexIO = TestHelper.getTestIndexIO(segmentWriteOutMediumFactory);
+    indexIO = TestHelper.getTestIndexIO();
     indexMerger = 
TestHelper.getTestIndexMergerV9(segmentWriteOutMediumFactory);
   }
 
diff --git 
a/server/src/test/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java
 
b/server/src/test/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java
index 5201e0b9139..b05dd09c23b 100644
--- 
a/server/src/test/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java
+++ 
b/server/src/test/java/org/apache/druid/segment/realtime/plumber/RealtimePlumberSchoolTest.java
@@ -221,7 +221,7 @@ public void setUp() throws Exception
         handoffNotifierFactory,
         MoreExecutors.sameThreadExecutor(),
         TestHelper.getTestIndexMergerV9(segmentWriteOutMediumFactory),
-        TestHelper.getTestIndexIO(segmentWriteOutMediumFactory),
+        TestHelper.getTestIndexIO(),
         MapCache.create(0),
         FireDepartmentTest.NO_CACHE_CONFIG,
         new CachePopulatorStats(),
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorConfigTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorConfigTest.java
index 61fe943484c..14dc152dac8 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorConfigTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/DruidCoordinatorConfigTest.java
@@ -55,7 +55,6 @@ public void testDeserialization()
     props.setProperty("druid.coordinator.period", "PT1s");
     props.setProperty("druid.coordinator.period.indexingPeriod", "PT1s");
     props.setProperty("druid.coordinator.merge.on", "true");
-    props.setProperty("druid.coordinator.conversion.on", "true");
     props.setProperty("druid.coordinator.kill.on", "true");
     props.setProperty("druid.coordinator.kill.period", "PT1s");
     props.setProperty("druid.coordinator.kill.durationToRetain", "PT1s");
diff --git a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java 
b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java
index 750a9fa5a8c..90268426127 100644
--- a/services/src/main/java/org/apache/druid/cli/CliCoordinator.java
+++ b/services/src/main/java/org/apache/druid/cli/CliCoordinator.java
@@ -29,6 +29,7 @@
 import com.google.inject.Provides;
 import com.google.inject.name.Names;
 import io.airlift.airline.Command;
+import org.apache.curator.framework.CuratorFramework;
 import org.apache.druid.audit.AuditManager;
 import org.apache.druid.client.CoordinatorServerView;
 import org.apache.druid.client.HttpServerInventoryViewResource;
@@ -66,7 +67,6 @@
 import org.apache.druid.server.coordinator.helper.DruidCoordinatorHelper;
 import 
org.apache.druid.server.coordinator.helper.DruidCoordinatorSegmentKiller;
 import 
org.apache.druid.server.coordinator.helper.DruidCoordinatorSegmentMerger;
-import 
org.apache.druid.server.coordinator.helper.DruidCoordinatorVersionConverter;
 import org.apache.druid.server.http.ClusterResource;
 import org.apache.druid.server.http.CoordinatorCompactionConfigsResource;
 import org.apache.druid.server.http.CoordinatorDynamicConfigsResource;
@@ -86,7 +86,6 @@
 import org.apache.druid.server.lookup.cache.LookupCoordinatorManager;
 import org.apache.druid.server.lookup.cache.LookupCoordinatorManagerConfig;
 import org.apache.druid.server.router.TieredBrokerConfig;
-import org.apache.curator.framework.CuratorFramework;
 import org.eclipse.jetty.server.Server;
 
 import java.util.ArrayList;
@@ -208,10 +207,6 @@ public void configure(Binder binder)
                 "druid.coordinator.merge.on",
                 Predicates.equalTo("true"),
                 DruidCoordinatorSegmentMerger.class
-            ).addConditionBinding(
-                "druid.coordinator.conversion.on",
-                Predicates.equalTo("true"),
-                DruidCoordinatorVersionConverter.class
             ).addConditionBinding(
                 "druid.coordinator.kill.on",
                 Predicates.equalTo("true"),


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to