This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 36298f0aa4 [Clean up] Simplify IngestionConfig construction (#8743)
36298f0aa4 is described below

commit 36298f0aa4d3d8e2b8d30768cd8cef5ee4edec7a
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Wed May 25 10:51:10 2022 -0700

    [Clean up] Simplify IngestionConfig construction (#8743)
---
 .../common/utils/config/TableConfigUtils.java      |  25 +-
 .../common/utils/config/TableConfigSerDeTest.java  |  35 ++-
 .../connector/flink/http/PinotConnectionUtils.java |  46 ++--
 .../flink/sink/PinotSinkIntegrationTest.java       |   6 +-
 .../pinot/controller/util/FileIngestionHelper.java |   8 +-
 .../helix/core/PinotHelixResourceManagerTest.java  |   6 +-
 .../core/retention/SegmentLineageCleanupTest.java  |   4 +-
 .../apache/pinot/core/util/SchemaUtilsTest.java    |  11 +-
 .../IngestionConfigHybridIntegrationTest.java      |  26 +-
 .../tests/JsonPathClusterIntegrationTest.java      |  10 +-
 .../tests/MapTypeClusterIntegrationTest.java       |  10 +-
 .../tests/OfflineClusterIntegrationTest.java       |  11 +-
 .../SegmentWriterUploaderIntegrationTest.java      |   8 +-
 .../preprocess/DataPreprocessingHelperTest.java    |   4 +-
 .../mergerollup/MergeRollupTaskGeneratorTest.java  |   4 +-
 .../RealtimeToOfflineSegmentsTaskExecutorTest.java |  13 +-
 .../filebased/FileBasedSegmentWriterTest.java      | 100 ++++----
 .../ExpressionTransformerTest.java                 |  95 ++++----
 .../recordtransformer/RecordTransformerTest.java   |  83 +++----
 .../SegmentGenerationWithFilterRecordsTest.java    |   4 +-
 .../index/loader/SegmentPreProcessorTest.java      |  14 +-
 .../segment/local/utils/IngestionUtilsTest.java    |  81 +++----
 .../segment/local/utils/TableConfigUtilsTest.java  | 263 +++++++--------------
 .../table/ingestion/BatchIngestionConfig.java      |   6 +-
 .../config/table/ingestion/IngestionConfig.java    |  15 +-
 .../pinot/spi/utils/IngestionConfigUtilsTest.java  |  31 +--
 26 files changed, 384 insertions(+), 535 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TableConfigUtils.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TableConfigUtils.java
index e515249761..f429cdf7b3 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TableConfigUtils.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TableConfigUtils.java
@@ -26,6 +26,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import org.apache.commons.collections.MapUtils;
 import org.apache.helix.ZNRecord;
 import org.apache.pinot.spi.config.table.FieldConfig;
 import org.apache.pinot.spi.config.table.IndexingConfig;
@@ -218,8 +219,7 @@ public class TableConfigUtils {
   }
 
   /**
-   * Helper method to convert from legacy/deprecated configs into current 
version
-   * of TableConfig.
+   * Helper method to convert from legacy/deprecated configs into current 
version of TableConfig.
    * <ul>
    *   <li>Moves deprecated ingestion related configs into Ingestion 
Config.</li>
    *   <li>The conversion happens in-place, the specified tableConfig is 
mutated in-place.</li>
@@ -247,7 +247,6 @@ public class TableConfigUtils {
       if (batchIngestionConfig.getSegmentIngestionType() == null) {
         batchIngestionConfig.setSegmentIngestionType(segmentPushType);
       }
-
       if (batchIngestionConfig.getSegmentIngestionFrequency() == null) {
         
batchIngestionConfig.setSegmentIngestionFrequency(segmentPushFrequency);
       }
@@ -258,32 +257,26 @@ public class TableConfigUtils {
     IndexingConfig indexingConfig = tableConfig.getIndexingConfig();
 
     if (streamIngestionConfig == null) {
-      Map<String, String> streamConfigs = indexingConfig.getStreamConfigs();
-
       // Only set the new config if the deprecated one is set.
-      if (streamConfigs != null && !streamConfigs.isEmpty()) {
+      Map<String, String> streamConfigs = indexingConfig.getStreamConfigs();
+      if (MapUtils.isNotEmpty(streamConfigs)) {
         streamIngestionConfig = new 
StreamIngestionConfig(Collections.singletonList(streamConfigs));
       }
     }
 
     if (ingestionConfig == null) {
       if (batchIngestionConfig != null || streamIngestionConfig != null) {
-        ingestionConfig = new IngestionConfig(batchIngestionConfig, 
streamIngestionConfig, null, null, null, null);
-      }
-    } else {
-      if (batchIngestionConfig != null) {
+        ingestionConfig = new IngestionConfig();
         ingestionConfig.setBatchIngestionConfig(batchIngestionConfig);
-      }
-
-      if (streamIngestionConfig != null) {
         ingestionConfig.setStreamIngestionConfig(streamIngestionConfig);
       }
+    } else {
+      ingestionConfig.setBatchIngestionConfig(batchIngestionConfig);
+      ingestionConfig.setStreamIngestionConfig(streamIngestionConfig);
     }
 
     // Set the new config fields.
-    if (ingestionConfig != null) {
-      tableConfig.setIngestionConfig(ingestionConfig);
-    }
+    tableConfig.setIngestionConfig(ingestionConfig);
 
     // Clear the deprecated ones.
     indexingConfig.setStreamConfigs(null);
diff --git 
a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
 
b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
index 05f12d04a0..bdbd41b685 100644
--- 
a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
+++ 
b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
@@ -21,7 +21,6 @@ package org.apache.pinot.common.utils.config;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import com.google.common.collect.Lists;
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -268,27 +267,21 @@ public class TableConfigSerDeTest {
     }
     {
       // With ingestion config
-      List<AggregationConfig> aggregationConfigs = Lists.newArrayList(new 
AggregationConfig("SUM__bar", "SUM(bar)"),
-          new AggregationConfig("MIN_foo", "MIN(foo)"));
-      List<TransformConfig> transformConfigs =
-          Lists.newArrayList(new TransformConfig("bar", "func(moo)"), new 
TransformConfig("zoo", "myfunc()"));
-      Map<String, String> batchConfigMap = new HashMap<>();
-      batchConfigMap.put("batchType", "s3");
-      Map<String, String> streamConfigMap = new HashMap<>();
-      streamConfigMap.put("streamType", "kafka");
-      List<Map<String, String>> streamConfigMaps = new ArrayList<>();
-      streamConfigMaps.add(streamConfigMap);
-      List<Map<String, String>> batchConfigMaps = new ArrayList<>();
-      batchConfigMaps.add(batchConfigMap);
-      List<String> fieldsToUnnest = Arrays.asList("c1, c2");
-      Map<String, String> prefixesToRename = new HashMap<>();
-      IngestionConfig ingestionConfig =
-          new IngestionConfig(new BatchIngestionConfig(batchConfigMaps, 
"APPEND", "HOURLY"),
-              new StreamIngestionConfig(streamConfigMaps), new 
FilterConfig("filterFunc(foo)"), transformConfigs,
-              new ComplexTypeConfig(fieldsToUnnest, ".", 
ComplexTypeConfig.CollectionNotUnnestedToJson.NON_PRIMITIVE,
-                  prefixesToRename), aggregationConfigs);
-      TableConfig tableConfig = 
tableConfigBuilder.setIngestionConfig(ingestionConfig).build();
+      IngestionConfig ingestionConfig = new IngestionConfig();
+      ingestionConfig.setBatchIngestionConfig(
+          new 
BatchIngestionConfig(Collections.singletonList(Collections.singletonMap("batchType",
 "s3")), "APPEND",
+              "HOURLY"));
+      ingestionConfig.setStreamIngestionConfig(
+          new 
StreamIngestionConfig(Collections.singletonList(Collections.singletonMap("streamType",
 "kafka"))));
+      ingestionConfig.setFilterConfig(new FilterConfig("filterFunc(foo)"));
+      ingestionConfig.setTransformConfigs(
+          Arrays.asList(new TransformConfig("bar", "func(moo)"), new 
TransformConfig("zoo", "myfunc()")));
+      ingestionConfig.setComplexTypeConfig(new 
ComplexTypeConfig(Arrays.asList("c1", "c2"), ".",
+          ComplexTypeConfig.CollectionNotUnnestedToJson.NON_PRIMITIVE, 
Collections.emptyMap()));
+      ingestionConfig.setAggregationConfigs(
+          Arrays.asList(new AggregationConfig("SUM__bar", "SUM(bar)"), new 
AggregationConfig("MIN_foo", "MIN(foo)")));
 
+      TableConfig tableConfig = 
tableConfigBuilder.setIngestionConfig(ingestionConfig).build();
       checkIngestionConfig(tableConfig);
 
       // Serialize then de-serialize
diff --git 
a/pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/http/PinotConnectionUtils.java
 
b/pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/http/PinotConnectionUtils.java
index 500b8fed26..c6a1b43da9 100644
--- 
a/pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/http/PinotConnectionUtils.java
+++ 
b/pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/http/PinotConnectionUtils.java
@@ -18,8 +18,7 @@
  */
 package org.apache.pinot.connector.flink.http;
 
-import com.google.common.collect.Lists;
-import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -48,7 +47,7 @@ public final class PinotConnectionUtils {
   }
 
   public static TableConfig getTableConfig(ControllerRequestClient client, 
String tableName, String tableType) {
-    TableConfig tableConfig = null;
+    TableConfig tableConfig;
     try {
       tableConfig = client.getTableConfig(tableName, 
TableType.valueOf(tableType));
     } catch (Exception e) {
@@ -57,37 +56,34 @@ public final class PinotConnectionUtils {
 
     LOGGER.info("fetched pinot config {}", tableConfig);
 
-    Map<String, String> newBatchConfigMaps = new HashMap<>();
+    Map<String, String> newBatchConfigMap = new HashMap<>();
     // append the batch config of controller URI
     String controllerBaseUrl = 
client.getControllerRequestURLBuilder().getBaseUrl();
-    newBatchConfigMaps.put("push.controllerUri", controllerBaseUrl);
-    newBatchConfigMaps.put("outputDirURI", "/tmp/pinotoutput");
+    newBatchConfigMap.put("push.controllerUri", controllerBaseUrl);
+    newBatchConfigMap.put("outputDirURI", "/tmp/pinotoutput");
+
     IngestionConfig ingestionConfig = tableConfig.getIngestionConfig();
     if (ingestionConfig == null) {
-      tableConfig.setIngestionConfig(
-          new IngestionConfig(new 
BatchIngestionConfig(Lists.newArrayList(newBatchConfigMaps), "APPEND", 
"HOURLY"),
-              null, null, null, null, null));
+      ingestionConfig = new IngestionConfig();
+      ingestionConfig.setBatchIngestionConfig(
+          new 
BatchIngestionConfig(Collections.singletonList(newBatchConfigMap), "APPEND", 
"HOURLY"));
+      tableConfig.setIngestionConfig(ingestionConfig);
       return tableConfig;
     }
-    if (ingestionConfig.getBatchIngestionConfig() == null) {
-      tableConfig.setIngestionConfig(
-          new IngestionConfig(new 
BatchIngestionConfig(Lists.newArrayList(newBatchConfigMaps), "APPEND", 
"HOURLY"),
-              null, ingestionConfig.getFilterConfig(), 
ingestionConfig.getTransformConfigs(),
-              ingestionConfig.getComplexTypeConfig(), 
ingestionConfig.getAggregationConfigs()));
+
+    BatchIngestionConfig batchIngestionConfig = 
ingestionConfig.getBatchIngestionConfig();
+    if (batchIngestionConfig == null) {
+      ingestionConfig.setBatchIngestionConfig(
+          new 
BatchIngestionConfig(Collections.singletonList(newBatchConfigMap), "APPEND", 
"HOURLY"));
       return tableConfig;
     }
 
-    List<Map<String, String>> batchConfigMaps =
-        ingestionConfig.getBatchIngestionConfig().getBatchConfigMaps() == null 
? new ArrayList<>()
-            : ingestionConfig.getBatchIngestionConfig().getBatchConfigMaps();
-    batchConfigMaps.add(newBatchConfigMaps);
-
-    tableConfig.setIngestionConfig(new IngestionConfig(
-        new BatchIngestionConfig(batchConfigMaps, 
ingestionConfig.getBatchIngestionConfig().getSegmentIngestionType(),
-            
ingestionConfig.getBatchIngestionConfig().getSegmentIngestionFrequency()), null,
-        ingestionConfig.getFilterConfig(), 
ingestionConfig.getTransformConfigs(),
-        ingestionConfig.getComplexTypeConfig(), 
ingestionConfig.getAggregationConfigs()));
-
+    List<Map<String, String>> batchConfigMaps = 
batchIngestionConfig.getBatchConfigMaps();
+    if (batchConfigMaps == null) {
+      
batchIngestionConfig.setBatchConfigMaps(Collections.singletonList(newBatchConfigMap));
+    } else {
+      batchConfigMaps.add(newBatchConfigMap);
+    }
     return tableConfig;
   }
 }
diff --git 
a/pinot-connectors/pinot-flink-connector/src/test/java/org/apache/pinot/connector/flink/sink/PinotSinkIntegrationTest.java
 
b/pinot-connectors/pinot-flink-connector/src/test/java/org/apache/pinot/connector/flink/sink/PinotSinkIntegrationTest.java
index a9a9086716..d48dc6016c 100644
--- 
a/pinot-connectors/pinot-flink-connector/src/test/java/org/apache/pinot/connector/flink/sink/PinotSinkIntegrationTest.java
+++ 
b/pinot-connectors/pinot-flink-connector/src/test/java/org/apache/pinot/connector/flink/sink/PinotSinkIntegrationTest.java
@@ -79,9 +79,9 @@ public class PinotSinkIntegrationTest extends 
BaseClusterIntegrationTest {
     batchConfigs.put(BatchConfigProperties.OUTPUT_DIR_URI, 
_tarDir.getAbsolutePath());
     batchConfigs.put(BatchConfigProperties.OVERWRITE_OUTPUT, "false");
     batchConfigs.put(BatchConfigProperties.PUSH_CONTROLLER_URI, 
_controllerBaseApiUrl);
-    IngestionConfig ingestionConfig =
-        new IngestionConfig(new 
BatchIngestionConfig(Collections.singletonList(batchConfigs), "APPEND", 
"HOURLY"), null,
-            null, null, null, null);
+    IngestionConfig ingestionConfig = new IngestionConfig();
+    ingestionConfig.setBatchIngestionConfig(
+        new BatchIngestionConfig(Collections.singletonList(batchConfigs), 
"APPEND", "HOURLY"));
     _tableConfig =
         new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setIngestionConfig(ingestionConfig)
             .build();
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionHelper.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionHelper.java
index 1b1238c4a2..380b62c55b 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionHelper.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionHelper.java
@@ -19,13 +19,13 @@
 package org.apache.pinot.controller.util;
 
 import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.URI;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.commons.io.FileUtils;
@@ -129,7 +129,7 @@ public class FileIngestionHelper {
         batchConfigMapOverride.put(segmentNamePostfixProp, 
String.valueOf(System.currentTimeMillis()));
       }
       BatchIngestionConfig batchIngestionConfigOverride =
-          new BatchIngestionConfig(Lists.newArrayList(batchConfigMapOverride),
+          new 
BatchIngestionConfig(Collections.singletonList(batchConfigMapOverride),
               IngestionConfigUtils.getBatchSegmentIngestionType(_tableConfig),
               
IngestionConfigUtils.getBatchSegmentIngestionFrequency(_tableConfig));
 
@@ -147,8 +147,8 @@ public class FileIngestionHelper {
       TarGzCompressionUtils.createTarGzFile(new File(outputDir, segmentName), 
segmentTarFile);
 
       // Upload segment
-      IngestionConfig ingestionConfigOverride =
-          new IngestionConfig(batchIngestionConfigOverride, null, null, null, 
null, null);
+      IngestionConfig ingestionConfigOverride = new IngestionConfig();
+      
ingestionConfigOverride.setBatchIngestionConfig(batchIngestionConfigOverride);
       TableConfig tableConfigOverride =
           new 
TableConfigBuilder(_tableConfig.getTableType()).setTableName(_tableConfig.getTableName())
               .setIngestionConfig(ingestionConfigOverride).build();
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
index b167a18256..12fad8d66a 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
@@ -786,12 +786,12 @@ public class PinotHelixResourceManagerTest {
       throws IOException, InterruptedException {
 
     // Create the table
+    IngestionConfig ingestionConfig = new IngestionConfig();
+    ingestionConfig.setBatchIngestionConfig(new BatchIngestionConfig(null, 
"REFRESH", "DAILY"));
     TableConfig tableConfig =
         new 
TableConfigBuilder(TableType.OFFLINE).setTableName(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME)
             
.setNumReplicas(2).setBrokerTenant(BROKER_TENANT_NAME).setServerTenant(SERVER_TENANT_NAME)
-            .setIngestionConfig(
-                new IngestionConfig(new BatchIngestionConfig(null, "REFRESH", 
"DAILY"), null, null, null, null, null))
-            .build();
+            .setIngestionConfig(ingestionConfig).build();
 
     TEST_INSTANCE.getHelixResourceManager().addTable(tableConfig);
 
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/SegmentLineageCleanupTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/SegmentLineageCleanupTest.java
index 16e70e91eb..66246b9c62 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/SegmentLineageCleanupTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/SegmentLineageCleanupTest.java
@@ -100,8 +100,8 @@ public class SegmentLineageCleanupTest {
             .setRetentionTimeValue(RETENTION_TIME_VALUE).build();
     TEST_INSTANCE.getHelixResourceManager().addTable(tableConfig);
 
-    IngestionConfig ingestionConfig =
-        new IngestionConfig(new BatchIngestionConfig(null, "REFRESH", 
"DAILY"), null, null, null, null, null);
+    IngestionConfig ingestionConfig = new IngestionConfig();
+    ingestionConfig.setBatchIngestionConfig(new BatchIngestionConfig(null, 
"REFRESH", "DAILY"));
     TableConfig refreshTableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(REFRESH_OFFLINE_TABLE_NAME)
         
.setBrokerTenant(BROKER_TENANT_NAME).setServerTenant(SERVER_TENANT_NAME).setNumReplicas(1)
         
.setRetentionTimeUnit(RETENTION_TIME_UNIT).setRetentionTimeValue(RETENTION_TIME_VALUE)
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/util/SchemaUtilsTest.java 
b/pinot-core/src/test/java/org/apache/pinot/core/util/SchemaUtilsTest.java
index af3d226708..85a834e9da 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/util/SchemaUtilsTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/util/SchemaUtilsTest.java
@@ -21,6 +21,7 @@ package org.apache.pinot.core.util;
 import com.google.common.collect.Lists;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 import org.apache.pinot.segment.local.utils.SchemaUtils;
@@ -91,9 +92,10 @@ public class SchemaUtilsTest {
 
     // schema doesn't have destination columns from transformConfigs
     schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).build();
-    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig(
-        new IngestionConfig(null, null, null, Lists.newArrayList(new 
TransformConfig("colA", "round(colB, 1000)")),
-            null, null)).build();
+    IngestionConfig ingestionConfig = new IngestionConfig();
+    ingestionConfig.setTransformConfigs(Collections.singletonList(new 
TransformConfig("colA", "round(colB, 1000)")));
+    tableConfig =
+        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig(ingestionConfig).build();
     try {
       SchemaUtils.validate(schema, Lists.newArrayList(tableConfig));
       Assert.fail("Should fail schema validation, as colA is not present in 
schema");
@@ -140,8 +142,7 @@ public class SchemaUtilsTest {
     schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
         .addDateTime(TIME_COLUMN, DataType.LONG, "1:MILLISECONDS:EPOCH", 
"1:HOURS").build();
     tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName(TIME_COLUMN)
-        .setIngestionConfig(new IngestionConfig(null, null, null,
-            Lists.newArrayList(new TransformConfig("colA", "round(colB, 
1000)")), null, null)).build();
+        .setIngestionConfig(ingestionConfig).build();
     try {
       SchemaUtils.validate(schema, Lists.newArrayList(tableConfig));
       Assert.fail("Should fail schema validation, as colA is not present in 
schema");
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/IngestionConfigHybridIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/IngestionConfigHybridIntegrationTest.java
index 9fdd12e940..4f8a752226 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/IngestionConfigHybridIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/IngestionConfigHybridIntegrationTest.java
@@ -20,7 +20,8 @@ package org.apache.pinot.integration.tests;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import java.io.File;
-import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import org.apache.pinot.spi.config.table.TableConfig;
@@ -60,17 +61,18 @@ public class IngestionConfigHybridIntegrationTest extends 
BaseClusterIntegration
 
   @Override
   protected IngestionConfig getIngestionConfig() {
+    IngestionConfig ingestionConfig = new IngestionConfig();
+    ingestionConfig.setStreamIngestionConfig(
+        new 
StreamIngestionConfig(Collections.singletonList(getStreamConfigMap())));
     FilterConfig filterConfig =
         new FilterConfig("Groovy({AirlineID == 19393 || ArrDelayMinutes <= 5 
}, AirlineID, ArrDelayMinutes)");
-    List<TransformConfig> transformConfigs = new ArrayList<>();
-    transformConfigs.add(new TransformConfig("AmPm", "Groovy({DepTime < 1200 ? 
\"AM\": \"PM\"}, DepTime)"));
-    transformConfigs.add(new TransformConfig("millisSinceEpoch", 
"fromEpochDays(DaysSinceEpoch)"));
-    transformConfigs.add(new TransformConfig("lowerCaseDestCityName", 
"lower(DestCityName)"));
-
-    List<Map<String, String>> streamConfigMaps = new ArrayList<>();
-    streamConfigMaps.add(getStreamConfigMap());
-    return new IngestionConfig(null, new 
StreamIngestionConfig(streamConfigMaps), filterConfig, transformConfigs, null,
-        null);
+    ingestionConfig.setFilterConfig(filterConfig);
+    List<TransformConfig> transformConfigs = Arrays.asList(
+        new TransformConfig("AmPm", "Groovy({DepTime < 1200 ? \"AM\": \"PM\"}, 
DepTime)"),
+        new TransformConfig("millisSinceEpoch", 
"fromEpochDays(DaysSinceEpoch)"),
+        new TransformConfig("lowerCaseDestCityName", "lower(DestCityName)"));
+    ingestionConfig.setTransformConfigs(transformConfigs);
+    return ingestionConfig;
   }
 
   @Override
@@ -139,8 +141,8 @@ public class IngestionConfigHybridIntegrationTest extends 
BaseClusterIntegration
     addTableConfig(createRealtimeTableConfig(realtimeAvroFiles.get(0)));
 
     // Create and upload segments
-    ClusterIntegrationTestUtils
-        .buildSegmentsFromAvro(offlineAvroFiles, offlineTableConfig, schema, 
0, _segmentDir, _tarDir);
+    ClusterIntegrationTestUtils.buildSegmentsFromAvro(offlineAvroFiles, 
offlineTableConfig, schema, 0, _segmentDir,
+        _tarDir);
     uploadSegments(getTableName(), _tarDir);
 
     // Push data into Kafka
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/JsonPathClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/JsonPathClusterIntegrationTest.java
index 065cca63de..31a2bedf9b 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/JsonPathClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/JsonPathClusterIntegrationTest.java
@@ -21,7 +21,6 @@ package org.apache.pinot.integration.tests;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
 import com.jayway.jsonpath.spi.cache.Cache;
 import com.jayway.jsonpath.spi.cache.CacheProvider;
 import java.io.File;
@@ -89,13 +88,16 @@ public class JsonPathClusterIntegrationTest extends 
BaseClusterIntegrationTest {
             .addSingleValueDimension(COMPLEX_MAP_STR_FIELD_NAME, 
DataType.STRING)
             .addMultiValueDimension(COMPLEX_MAP_STR_K3_FIELD_NAME, 
DataType.STRING).build();
     addSchema(schema);
-    List<TransformConfig> transformConfigs = Lists.newArrayList(
+    List<TransformConfig> transformConfigs = Arrays.asList(
         new TransformConfig(MY_MAP_STR_K1_FIELD_NAME, "jsonPathString(" + 
MY_MAP_STR_FIELD_NAME + ", '$.k1')"),
         new TransformConfig(MY_MAP_STR_K2_FIELD_NAME, "jsonPathString(" + 
MY_MAP_STR_FIELD_NAME + ", '$.k2')"),
         new TransformConfig(COMPLEX_MAP_STR_K3_FIELD_NAME,
             "jsonPathArray(" + COMPLEX_MAP_STR_FIELD_NAME + ", '$.k3')"));
-    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(rawTableName)
-        .setIngestionConfig(new IngestionConfig(null, null, null, 
transformConfigs, null, null)).build();
+    IngestionConfig ingestionConfig = new IngestionConfig();
+    ingestionConfig.setTransformConfigs(transformConfigs);
+    TableConfig tableConfig =
+        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(rawTableName).setIngestionConfig(ingestionConfig)
+            .build();
     addTableConfig(tableConfig);
 
     // Create and upload segments
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MapTypeClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MapTypeClusterIntegrationTest.java
index 726572a7ca..68ec52dc5d 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MapTypeClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MapTypeClusterIntegrationTest.java
@@ -19,7 +19,6 @@
 package org.apache.pinot.integration.tests;
 
 import com.fasterxml.jackson.databind.JsonNode;
-import com.google.common.collect.Lists;
 import java.io.File;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -80,11 +79,14 @@ public class MapTypeClusterIntegrationTest extends 
BaseClusterIntegrationTest {
         .addMultiValueDimension(INT_KEY_MAP_FIELD_NAME + 
SchemaUtils.MAP_VALUE_COLUMN_SUFFIX, DataType.INT)
         .addSingleValueDimension(STRING_KEY_MAP_STR_FIELD_NAME, 
DataType.STRING)
         .addSingleValueDimension(INT_KEY_MAP_STR_FIELD_NAME, 
DataType.STRING).build();
-    List<TransformConfig> transformConfigs = Lists.newArrayList(
+    List<TransformConfig> transformConfigs = Arrays.asList(
         new TransformConfig(STRING_KEY_MAP_STR_FIELD_NAME, "toJsonMapStr(" + 
STRING_KEY_MAP_FIELD_NAME + ")"),
         new TransformConfig(INT_KEY_MAP_STR_FIELD_NAME, "toJsonMapStr(" + 
INT_KEY_MAP_FIELD_NAME + ")"));
-    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(rawTableName)
-        .setIngestionConfig(new IngestionConfig(null, null, null, 
transformConfigs, null, null)).build();
+    IngestionConfig ingestionConfig = new IngestionConfig();
+    ingestionConfig.setTransformConfigs(transformConfigs);
+    TableConfig tableConfig =
+        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(rawTableName).setIngestionConfig(ingestionConfig)
+            .build();
     addTableConfig(tableConfig);
 
     // Create and upload segments
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index 4dbaa85846..779f7780c4 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -967,10 +967,13 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     _schemaFileName = SCHEMA_FILE_NAME_WITH_EXTRA_COLUMNS;
     addSchema(createSchema());
     TableConfig tableConfig = getOfflineTableConfig();
-    tableConfig.setIngestionConfig(new IngestionConfig(null, null, null,
-        Arrays.asList(new TransformConfig("NewAddedDerivedHoursSinceEpoch", 
"times(DaysSinceEpoch, 24)"),
-            new TransformConfig("NewAddedDerivedSecondsSinceEpoch", 
"times(times(DaysSinceEpoch, 24), 3600)"),
-            new TransformConfig("NewAddedDerivedMVStringDimension", 
"split(DestCityName, ', ')")), null, null));
+    List<TransformConfig> transformConfigs = Arrays.asList(
+        new TransformConfig("NewAddedDerivedHoursSinceEpoch", 
"times(DaysSinceEpoch, 24)"),
+        new TransformConfig("NewAddedDerivedSecondsSinceEpoch", 
"times(times(DaysSinceEpoch, 24), 3600)"),
+        new TransformConfig("NewAddedDerivedMVStringDimension", 
"split(DestCityName, ', ')"));
+    IngestionConfig ingestionConfig = new IngestionConfig();
+    ingestionConfig.setTransformConfigs(transformConfigs);
+    tableConfig.setIngestionConfig(ingestionConfig);
     updateTableConfig(tableConfig);
 
     // Trigger reload
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentWriterUploaderIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentWriterUploaderIntegrationTest.java
index fa67d8c62a..bfdef722fd 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentWriterUploaderIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentWriterUploaderIntegrationTest.java
@@ -20,10 +20,10 @@ package org.apache.pinot.integration.tests;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.google.common.base.Function;
-import com.google.common.collect.Lists;
 import java.io.File;
 import java.io.IOException;
 import java.net.URI;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -89,8 +89,10 @@ public class SegmentWriterUploaderIntegrationTest extends 
BaseClusterIntegration
     batchConfigMap.put(BatchConfigProperties.OUTPUT_DIR_URI, 
_tarDir.getAbsolutePath());
     batchConfigMap.put(BatchConfigProperties.OVERWRITE_OUTPUT, "false");
     batchConfigMap.put(BatchConfigProperties.PUSH_CONTROLLER_URI, 
_controllerBaseApiUrl);
-    return new IngestionConfig(new 
BatchIngestionConfig(Lists.newArrayList(batchConfigMap), "APPEND", "HOURLY"), 
null,
-        null, null, null, null);
+    IngestionConfig ingestionConfig = new IngestionConfig();
+    ingestionConfig.setBatchIngestionConfig(
+        new BatchIngestionConfig(Collections.singletonList(batchConfigMap), 
"APPEND", "HOURLY"));
+    return ingestionConfig;
   }
 
   /**
diff --git 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperTest.java
 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperTest.java
index 9cbb840997..db2e811c93 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperTest.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperTest.java
@@ -54,8 +54,8 @@ public class DataPreprocessingHelperTest {
     Path outputPath = new Path("mockOutputPath");
     DataPreprocessingHelper dataPreprocessingHelper = new 
AvroDataPreprocessingHelper(inputPaths, outputPath);
 
-    BatchIngestionConfig batchIngestionConfig = new BatchIngestionConfig(null, 
"APPEND", "DAILY");
-    IngestionConfig ingestionConfig = new 
IngestionConfig(batchIngestionConfig, null, null, null, null, null);
+    IngestionConfig ingestionConfig = new IngestionConfig();
+    ingestionConfig.setBatchIngestionConfig(new BatchIngestionConfig(null, 
"APPEND", "DAILY"));
 
     TableConfig tableConfig =
         new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTableName").setIngestionConfig(ingestionConfig)
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGeneratorTest.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGeneratorTest.java
index 1fbfcc59d1..5830f76e86 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGeneratorTest.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGeneratorTest.java
@@ -96,8 +96,8 @@ public class MergeRollupTaskGeneratorTest {
     assertTrue(pinotTaskConfigs.isEmpty());
 
     // Skip task generation, if REFRESH table
-    IngestionConfig ingestionConfig =
-        new IngestionConfig(new BatchIngestionConfig(null, "REFRESH", null), 
null, null, null, null, null);
+    IngestionConfig ingestionConfig = new IngestionConfig();
+    ingestionConfig.setBatchIngestionConfig(new BatchIngestionConfig(null, 
"REFRESH", null));
     offlineTableConfig = getOfflineTableConfig(new HashMap<>());
     offlineTableConfig.setIngestionConfig(ingestionConfig);
     pinotTaskConfigs = 
generator.generateTasks(Lists.newArrayList(offlineTableConfig));
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutorTest.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutorTest.java
index be3118a51b..f2a197c892 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutorTest.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutorTest.java
@@ -21,6 +21,7 @@ package 
org.apache.pinot.plugin.minion.tasks.realtimetoofflinesegments;
 import com.google.common.collect.Lists;
 import java.io.File;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -98,14 +99,18 @@ public class RealtimeToOfflineSegmentsTaskExecutorTest {
     TableConfig tableConfigWithSortedCol =
         new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME_WITH_SORTED_COL).setTimeColumnName(T)
             .setSortedColumn(D1).build();
+    IngestionConfig ingestionConfigEpochHours = new IngestionConfig();
+    ingestionConfigEpochHours.setTransformConfigs(
+        Collections.singletonList(new TransformConfig(T_TRX, 
"toEpochHours(t)")));
     TableConfig tableConfigEpochHours =
         new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME_EPOCH_HOURS).setTimeColumnName(T_TRX)
-            .setSortedColumn(D1).setIngestionConfig(new IngestionConfig(null, 
null, null,
-                Lists.newArrayList(new TransformConfig(T_TRX, 
"toEpochHours(t)")), null, null)).build();
+            
.setSortedColumn(D1).setIngestionConfig(ingestionConfigEpochHours).build();
+    IngestionConfig ingestionConfigSDF = new IngestionConfig();
+    ingestionConfigSDF.setTransformConfigs(
+        Collections.singletonList(new TransformConfig(T_TRX, "toDateTime(t, 
'yyyyMMddHH')")));
     TableConfig tableConfigSDF =
         new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME_SDF).setTimeColumnName(T_TRX)
-            .setSortedColumn(D1).setIngestionConfig(new IngestionConfig(null, 
null, null,
-                Lists.newArrayList(new TransformConfig(T_TRX, "toDateTime(t, 
'yyyyMMddHH')")), null, null)).build();
+            
.setSortedColumn(D1).setIngestionConfig(ingestionConfigSDF).build();
     Schema schema =
         new 
Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension(D1, 
FieldSpec.DataType.STRING)
             .addMetric(M1, FieldSpec.DataType.INT)
diff --git 
a/pinot-plugins/pinot-segment-writer/pinot-segment-writer-file-based/src/test/java/org/apache/pinot/plugin/segmentwriter/filebased/FileBasedSegmentWriterTest.java
 
b/pinot-plugins/pinot-segment-writer/pinot-segment-writer-file-based/src/test/java/org/apache/pinot/plugin/segmentwriter/filebased/FileBasedSegmentWriterTest.java
index f48eb2195a..a6e4b9c32f 100644
--- 
a/pinot-plugins/pinot-segment-writer/pinot-segment-writer-file-based/src/test/java/org/apache/pinot/plugin/segmentwriter/filebased/FileBasedSegmentWriterTest.java
+++ 
b/pinot-plugins/pinot-segment-writer/pinot-segment-writer-file-based/src/test/java/org/apache/pinot/plugin/segmentwriter/filebased/FileBasedSegmentWriterTest.java
@@ -23,6 +23,7 @@ import com.google.common.collect.Lists;
 import java.io.File;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -69,14 +70,12 @@ public class FileBasedSegmentWriterTest {
     Preconditions.checkState(_tmpDir.mkdirs());
     _outputDir = new File(_tmpDir, "segmentWriterOutputDir");
 
-    List<TransformConfig> transformConfigs = new ArrayList<>();
-    transformConfigs.add(new TransformConfig("aSimpleMap_str", 
"jsonFormat(aSimpleMap)"));
-    transformConfigs.add(new TransformConfig("anAdvancedMap_str", 
"jsonFormat(anAdvancedMap)"));
-    Map<String, String> batchConfigMap = new HashMap<>();
-    batchConfigMap.put(BatchConfigProperties.OUTPUT_DIR_URI, 
_outputDir.getAbsolutePath());
-    _ingestionConfig =
-        new IngestionConfig(new 
BatchIngestionConfig(Lists.newArrayList(batchConfigMap), "APPEND", "HOURLY"), 
null,
-            null, transformConfigs, null, null);
+    _ingestionConfig = new IngestionConfig();
+    _ingestionConfig.setBatchIngestionConfig(new 
BatchIngestionConfig(Collections.singletonList(
+        Collections.singletonMap(BatchConfigProperties.OUTPUT_DIR_URI, 
_outputDir.getAbsolutePath())), "APPEND",
+        "HOURLY"));
+    _ingestionConfig.setTransformConfigs(Arrays.asList(new 
TransformConfig("aSimpleMap_str", "jsonFormat(aSimpleMap)"),
+        new TransformConfig("anAdvancedMap_str", 
"jsonFormat(anAdvancedMap)")));
     _tableConfig =
         new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig(_ingestionConfig)
             .setTimeColumnName(TIME_COLUMN_NAME).build();
@@ -101,7 +100,6 @@ public class FileBasedSegmentWriterTest {
   @Test
   public void testBatchConfigs()
       throws Exception {
-
     SegmentWriter segmentWriter = new FileBasedSegmentWriter();
     TableConfig tableConfig =
         new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTimeColumnName(TIME_COLUMN_NAME).build();
@@ -112,7 +110,8 @@ public class FileBasedSegmentWriterTest {
       // expected
     }
 
-    tableConfig.setIngestionConfig(new IngestionConfig(null, null, null, null, 
null, null));
+    IngestionConfig ingestionConfig = new IngestionConfig();
+    tableConfig.setIngestionConfig(ingestionConfig);
     try {
       segmentWriter.init(tableConfig, _schema);
       Assert.fail("Should fail due to missing batchIngestionConfig");
@@ -120,8 +119,7 @@ public class FileBasedSegmentWriterTest {
       // expected
     }
 
-    tableConfig.setIngestionConfig(
-        new IngestionConfig(new BatchIngestionConfig(null, "APPEND", 
"HOURLY"), null, null, null, null, null));
+    ingestionConfig.setBatchIngestionConfig(new BatchIngestionConfig(null, 
"APPEND", "HOURLY"));
     try {
       segmentWriter.init(tableConfig, _schema);
       Assert.fail("Should fail due to missing batchConfigMaps");
@@ -129,9 +127,7 @@ public class FileBasedSegmentWriterTest {
       // expected
     }
 
-    tableConfig.setIngestionConfig(
-        new IngestionConfig(new BatchIngestionConfig(Collections.emptyList(), 
"APPEND", "HOURLY"), null, null, null,
-            null, null));
+    ingestionConfig.setBatchIngestionConfig(new 
BatchIngestionConfig(Collections.emptyList(), "APPEND", "HOURLY"));
     try {
       segmentWriter.init(tableConfig, _schema);
       Assert.fail("Should fail due to missing batchConfigMaps");
@@ -139,9 +135,8 @@ public class FileBasedSegmentWriterTest {
       // expected
     }
 
-    tableConfig.setIngestionConfig(
-        new IngestionConfig(new 
BatchIngestionConfig(Lists.newArrayList(Collections.emptyMap()), "APPEND", 
"HOURLY"),
-            null, null, null, null, null));
+    ingestionConfig.setBatchIngestionConfig(
+        new 
BatchIngestionConfig(Collections.singletonList(Collections.emptyMap()), 
"APPEND", "HOURLY"));
     try {
       segmentWriter.init(tableConfig, _schema);
       Assert.fail("Should fail due to missing outputDirURI in batchConfigMap");
@@ -149,11 +144,9 @@ public class FileBasedSegmentWriterTest {
       // expected
     }
 
-    Map<String, String> batchConfigMap = new HashMap<>();
-    batchConfigMap.put(BatchConfigProperties.OUTPUT_DIR_URI, 
_outputDir.getAbsolutePath());
-    tableConfig.setIngestionConfig(
-        new IngestionConfig(new 
BatchIngestionConfig(Lists.newArrayList(batchConfigMap), "APPEND", "HOURLY"), 
null,
-            null, null, null, null));
+    ingestionConfig.setBatchIngestionConfig(new 
BatchIngestionConfig(Collections.singletonList(
+        Collections.singletonMap(BatchConfigProperties.OUTPUT_DIR_URI, 
_outputDir.getAbsolutePath())), "APPEND",
+        "HOURLY"));
     segmentWriter.init(tableConfig, _schema);
     segmentWriter.close();
   }
@@ -242,20 +235,19 @@ public class FileBasedSegmentWriterTest {
     FileUtils.deleteQuietly(_outputDir);
 
     // FIXED segment name
-    Map<String, String> batchConfigMap = 
_ingestionConfig.getBatchIngestionConfig().getBatchConfigMaps().get(0);
-    Map<String, String> batchConfigMapOverride = new HashMap<>(batchConfigMap);
-    batchConfigMapOverride
-        .put(BatchConfigProperties.SEGMENT_NAME_GENERATOR_TYPE, 
BatchConfigProperties.SegmentNameGeneratorType.FIXED);
-    batchConfigMapOverride.put(String
-            .format("%s.%s", 
BatchConfigProperties.SEGMENT_NAME_GENERATOR_PROP_PREFIX,
-                BatchConfigProperties.SEGMENT_NAME),
-        "customSegmentName");
+    Map<String, String> batchConfigMap = new HashMap<>();
+    batchConfigMap.put(BatchConfigProperties.OUTPUT_DIR_URI, 
_outputDir.getAbsolutePath());
+    batchConfigMap.put(BatchConfigProperties.SEGMENT_NAME_GENERATOR_TYPE,
+        BatchConfigProperties.SegmentNameGeneratorType.FIXED);
+    batchConfigMap.put(String.format("%s.%s", 
BatchConfigProperties.SEGMENT_NAME_GENERATOR_PROP_PREFIX,
+        BatchConfigProperties.SEGMENT_NAME), "customSegmentName");
+    IngestionConfig ingestionConfig = new IngestionConfig();
+    ingestionConfig.setBatchIngestionConfig(
+        new BatchIngestionConfig(Collections.singletonList(batchConfigMap), 
"APPEND", "HOURLY"));
+    
ingestionConfig.setTransformConfigs(_ingestionConfig.getTransformConfigs());
     TableConfig tableConfig =
         new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTimeColumnName(TIME_COLUMN_NAME)
-            .setIngestionConfig(new IngestionConfig(new 
BatchIngestionConfig(Lists.newArrayList(batchConfigMapOverride),
-                
_ingestionConfig.getBatchIngestionConfig().getSegmentIngestionType(),
-                
_ingestionConfig.getBatchIngestionConfig().getSegmentIngestionFrequency()), 
null, null,
-                _ingestionConfig.getTransformConfigs(), null, null)).build();
+            .setIngestionConfig(ingestionConfig).build();
 
     SegmentWriter segmentWriter = new FileBasedSegmentWriter();
     segmentWriter.init(tableConfig, _schema);
@@ -274,14 +266,8 @@ public class FileBasedSegmentWriterTest {
     segmentWriter.close();
 
     // NORMALIZED segment name
-    batchConfigMapOverride = new HashMap<>(batchConfigMap);
-    
batchConfigMapOverride.put(BatchConfigProperties.SEGMENT_NAME_GENERATOR_TYPE,
+    batchConfigMap.put(BatchConfigProperties.SEGMENT_NAME_GENERATOR_TYPE,
         BatchConfigProperties.SegmentNameGeneratorType.NORMALIZED_DATE);
-    tableConfig.setIngestionConfig(new IngestionConfig(
-        new BatchIngestionConfig(Lists.newArrayList(batchConfigMapOverride),
-            
_ingestionConfig.getBatchIngestionConfig().getSegmentIngestionType(),
-            
_ingestionConfig.getBatchIngestionConfig().getSegmentIngestionFrequency()), 
null, null,
-        _ingestionConfig.getTransformConfigs(), null, null));
     segmentWriter.init(tableConfig, _schema);
 
     // write 2 records
@@ -297,15 +283,9 @@ public class FileBasedSegmentWriterTest {
     FileUtils.deleteQuietly(_outputDir);
 
     // SIMPLE segment name w/ sequenceId
-    batchConfigMapOverride = new HashMap<>(batchConfigMap);
-    batchConfigMapOverride
-        .put(BatchConfigProperties.SEGMENT_NAME_GENERATOR_TYPE, 
BatchConfigProperties.SegmentNameGeneratorType.SIMPLE);
-    batchConfigMapOverride.put(BatchConfigProperties.SEQUENCE_ID, "1001");
-    tableConfig.setIngestionConfig(new IngestionConfig(
-        new BatchIngestionConfig(Lists.newArrayList(batchConfigMapOverride),
-            
_ingestionConfig.getBatchIngestionConfig().getSegmentIngestionType(),
-            
_ingestionConfig.getBatchIngestionConfig().getSegmentIngestionFrequency()), 
null, null,
-        _ingestionConfig.getTransformConfigs(), null, null));
+    batchConfigMap.put(BatchConfigProperties.SEGMENT_NAME_GENERATOR_TYPE,
+        BatchConfigProperties.SegmentNameGeneratorType.SIMPLE);
+    batchConfigMap.put(BatchConfigProperties.SEQUENCE_ID, "1001");
     segmentWriter.init(tableConfig, _schema);
 
     // write 2 records
@@ -329,16 +309,18 @@ public class FileBasedSegmentWriterTest {
       throws Exception {
     FileUtils.deleteQuietly(_outputDir);
 
-    SegmentWriter segmentWriter = new FileBasedSegmentWriter();
-    Map<String, String> batchConfigMap = 
_ingestionConfig.getBatchIngestionConfig().getBatchConfigMaps().get(0);
-    Map<String, String> batchConfigMapOverride = new HashMap<>(batchConfigMap);
-    batchConfigMapOverride.put(BatchConfigProperties.OVERWRITE_OUTPUT, "true");
+    Map<String, String> batchConfigMap = new HashMap<>();
+    batchConfigMap.put(BatchConfigProperties.OUTPUT_DIR_URI, 
_outputDir.getAbsolutePath());
+    batchConfigMap.put(BatchConfigProperties.OVERWRITE_OUTPUT, "true");
+    IngestionConfig ingestionConfig = new IngestionConfig();
+    ingestionConfig.setBatchIngestionConfig(
+        new BatchIngestionConfig(Collections.singletonList(batchConfigMap), 
"APPEND", "HOURLY"));
+    
ingestionConfig.setTransformConfigs(_ingestionConfig.getTransformConfigs());
     TableConfig tableConfig =
         new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTimeColumnName(TIME_COLUMN_NAME)
-            .setIngestionConfig(new IngestionConfig(new 
BatchIngestionConfig(Lists.newArrayList(batchConfigMapOverride),
-                
_ingestionConfig.getBatchIngestionConfig().getSegmentIngestionType(),
-                
_ingestionConfig.getBatchIngestionConfig().getSegmentIngestionFrequency()), 
null, null,
-                _ingestionConfig.getTransformConfigs(), null, null)).build();
+            .setIngestionConfig(ingestionConfig).build();
+
+    SegmentWriter segmentWriter = new FileBasedSegmentWriter();
     segmentWriter.init(tableConfig, _schema);
 
     // write 3 records with same timestamp
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformerTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformerTest.java
index 3415a46995..1c37779339 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformerTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformerTest.java
@@ -20,6 +20,7 @@ package org.apache.pinot.segment.local.recordtransformer;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -53,15 +54,17 @@ public class ExpressionTransformerTest {
         .addMultiValueDimension("map2_values", 
FieldSpec.DataType.INT).addMetric("cost", FieldSpec.DataType.DOUBLE)
         .addDateTime("hoursSinceEpoch", FieldSpec.DataType.LONG, 
"1:HOURS:EPOCH", "1:HOURS").build();
 
-    List<TransformConfig> transformConfigs = new ArrayList<>();
-    transformConfigs.add(new TransformConfig("userId", "Groovy({user_id}, 
user_id)"));
-    transformConfigs.add(new TransformConfig("fullName", "Groovy({firstName+' 
'+lastName}, firstName, lastName)"));
-    transformConfigs.add(new TransformConfig("maxBid", "Groovy({bids.max{ 
it.toBigDecimal() }}, bids)"));
-    transformConfigs.add(new TransformConfig("map2_keys", 
"Groovy({map2.sort()*.key}, map2)"));
-    transformConfigs.add(new TransformConfig("map2_values", 
"Groovy({map2.sort()*.value}, map2)"));
-    transformConfigs.add(new TransformConfig("hoursSinceEpoch", 
"Groovy({timestamp/(1000*60*60)}, timestamp)"));
+    List<TransformConfig> transformConfigs = Arrays.asList(
+        new TransformConfig("userId", "Groovy({user_id}, user_id)"),
+        new TransformConfig("fullName", "Groovy({firstName+' '+lastName}, 
firstName, lastName)"),
+        new TransformConfig("maxBid", "Groovy({bids.max{ it.toBigDecimal() }}, 
bids)"),
+        new TransformConfig("map2_keys", "Groovy({map2.sort()*.key}, map2)"),
+        new TransformConfig("map2_values", "Groovy({map2.sort()*.value}, 
map2)"),
+        new TransformConfig("hoursSinceEpoch", 
"Groovy({timestamp/(1000*60*60)}, timestamp)"));
+    IngestionConfig ingestionConfig = new IngestionConfig();
+    ingestionConfig.setTransformConfigs(transformConfigs);
     TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTransformFunctions")
-        .setIngestionConfig(new IngestionConfig(null, null, null, 
transformConfigs, null, null)).build();
+        .setIngestionConfig(ingestionConfig).build();
 
     ExpressionTransformer expressionTransformer = new 
ExpressionTransformer(tableConfig, pinotSchema);
     DataTypeTransformer dataTypeTransformer = new 
DataTypeTransformer(pinotSchema);
@@ -145,12 +148,14 @@ public class ExpressionTransformerTest {
     // also specified in table config, ignore the schema setting
     
pinotSchema.getFieldSpecFor("hoursSinceEpoch").setTransformFunction("Groovy({timestamp/(1000)},
 timestamp)");
 
-    List<TransformConfig> transformConfigs = new ArrayList<>();
-    transformConfigs.add(new TransformConfig("userId", "Groovy({user_id}, 
user_id)"));
-    transformConfigs.add(new TransformConfig("fullName", "Groovy({firstName+' 
'+lastName}, firstName, lastName)"));
-    transformConfigs.add(new TransformConfig("hoursSinceEpoch", 
"Groovy({timestamp/(1000*60*60)}, timestamp)"));
+    List<TransformConfig> transformConfigs = Arrays.asList(
+        new TransformConfig("userId", "Groovy({user_id}, user_id)"),
+        new TransformConfig("fullName", "Groovy({firstName+' '+lastName}, 
firstName, lastName)"),
+        new TransformConfig("hoursSinceEpoch", 
"Groovy({timestamp/(1000*60*60)}, timestamp)"));
+    IngestionConfig ingestionConfig = new IngestionConfig();
+    ingestionConfig.setTransformConfigs(transformConfigs);
     TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTransformFunctions")
-        .setIngestionConfig(new IngestionConfig(null, null, null, 
transformConfigs, null, null)).build();
+        .setIngestionConfig(ingestionConfig).build();
 
     ExpressionTransformer expressionTransformer = new 
ExpressionTransformer(tableConfig, pinotSchema);
 
@@ -201,10 +206,13 @@ public class ExpressionTransformerTest {
     Schema pinotSchema = new Schema();
     DimensionFieldSpec dimensionFieldSpec = new DimensionFieldSpec("fullName", 
FieldSpec.DataType.STRING, true);
     pinotSchema.addField(dimensionFieldSpec);
-    List<TransformConfig> transformConfigs = new ArrayList<>();
-    transformConfigs.add(new TransformConfig("fullName", "Groovy({firstName + 
' ' + lastName}, firstName, lastName)"));
-    TableConfig tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName("testValueExists")
-        .setIngestionConfig(new IngestionConfig(null, null, null, 
transformConfigs, null, null)).build();
+    List<TransformConfig> transformConfigs = Collections.singletonList(
+        new TransformConfig("fullName", "Groovy({firstName + ' ' + lastName}, 
firstName, lastName)"));
+    IngestionConfig ingestionConfig = new IngestionConfig();
+    ingestionConfig.setTransformConfigs(transformConfigs);
+    TableConfig tableConfig =
+        new 
TableConfigBuilder(TableType.REALTIME).setTableName("testValueExists").setIngestionConfig(ingestionConfig)
+            .build();
     ExpressionTransformer expressionTransformer = new 
ExpressionTransformer(tableConfig, pinotSchema);
 
     GenericRow genericRow = new GenericRow();
@@ -220,7 +228,7 @@ public class ExpressionTransformerTest {
         .addTime(new TimeGranularitySpec(FieldSpec.DataType.LONG, 
TimeUnit.MILLISECONDS, "incoming"),
             new TimeGranularitySpec(FieldSpec.DataType.INT, TimeUnit.DAYS, 
"outgoing")).build();
     tableConfig = new 
TableConfigBuilder(TableType.REALTIME).setTableName("testValueExists")
-        .setIngestionConfig(new IngestionConfig(null, null, null, null, null, 
null)).build();
+        .setIngestionConfig(new IngestionConfig()).build();
     expressionTransformer = new ExpressionTransformer(tableConfig, 
pinotSchema);
 
     genericRow = new GenericRow();
@@ -238,13 +246,14 @@ public class ExpressionTransformerTest {
         .addSingleValueDimension("b", 
FieldSpec.DataType.STRING).addSingleValueDimension("c", 
FieldSpec.DataType.STRING)
         .addSingleValueDimension("d", 
FieldSpec.DataType.STRING).addSingleValueDimension("e", 
FieldSpec.DataType.STRING)
         .addSingleValueDimension("f", FieldSpec.DataType.STRING).build();
-    List<TransformConfig> transformConfigs = new ArrayList<>();
-    transformConfigs.add(new TransformConfig("d", "plus(x, 10)"));
-    transformConfigs.add(new TransformConfig("b", "plus(d, 10)"));
-    transformConfigs.add(new TransformConfig("a", "plus(b, 10)"));
-    transformConfigs.add(new TransformConfig("c", "plus(a, d)"));
-    transformConfigs.add(new TransformConfig("f", "plus(e, 10)"));
-    IngestionConfig ingestionConfig = new IngestionConfig(null, null, null, 
transformConfigs, null, null);
+    List<TransformConfig> transformConfigs = Arrays.asList(
+        new TransformConfig("d", "plus(x, 10)"),
+        new TransformConfig("b", "plus(d, 10)"),
+        new TransformConfig("a", "plus(b, 10)"),
+        new TransformConfig("c", "plus(a, d)"),
+        new TransformConfig("f", "plus(e, 10)"));
+    IngestionConfig ingestionConfig = new IngestionConfig();
+    ingestionConfig.setTransformConfigs(transformConfigs);
     TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testDerivedFunctions")
         .setIngestionConfig(ingestionConfig).build();
     ExpressionTransformer expressionTransformer = new 
ExpressionTransformer(tableConfig, schema);
@@ -268,11 +277,11 @@ public class ExpressionTransformerTest {
         .addSingleValueDimension("b", 
FieldSpec.DataType.INT).addSingleValueDimension("c", FieldSpec.DataType.INT)
         .build();
 
-    List<TransformConfig> transformConfigs = new ArrayList<>();
-    transformConfigs.add(new TransformConfig("a", "plus(b,10)"));
-    transformConfigs.add(new TransformConfig("a", "plus(c,10)"));
-
-    IngestionConfig ingestionConfig = new IngestionConfig(null, null, null, 
transformConfigs, null, null);
+    List<TransformConfig> transformConfigs = Arrays.asList(
+        new TransformConfig("a", "plus(b,10)"),
+        new TransformConfig("a", "plus(c,10)"));
+    IngestionConfig ingestionConfig = new IngestionConfig();
+    ingestionConfig.setTransformConfigs(transformConfigs);
     TableConfig tableConfig =
         new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testMultipleTransformFunctionSortOrder")
             .setIngestionConfig(ingestionConfig).build();
@@ -289,13 +298,13 @@ public class ExpressionTransformerTest {
         .build();
 
     // Define transform function dependencies: a -> (b,c), b -> d, d -> e, c 
-> (d,e)
-    List<TransformConfig> transformConfigs = new ArrayList<>();
-    transformConfigs.add(new TransformConfig("a", "plus(b,c)"));
-    transformConfigs.add(new TransformConfig("b", "plus(d,10)"));
-    transformConfigs.add(new TransformConfig("d", "plus(e,10)"));
-    transformConfigs.add(new TransformConfig("c", "plus(d,e)"));
-
-    IngestionConfig ingestionConfig = new IngestionConfig(null, null, null, 
transformConfigs, null, null);
+    List<TransformConfig> transformConfigs = Arrays.asList(
+        new TransformConfig("a", "plus(b,c)"),
+        new TransformConfig("b", "plus(d,10)"),
+        new TransformConfig("d", "plus(e,10)"),
+        new TransformConfig("c", "plus(d,e)"));
+    IngestionConfig ingestionConfig = new IngestionConfig();
+    ingestionConfig.setTransformConfigs(transformConfigs);
     TableConfig tableConfig =
         new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testNonCyclicTransformFunctionSortOrder")
             .setIngestionConfig(ingestionConfig).build();
@@ -318,12 +327,12 @@ public class ExpressionTransformerTest {
         .build();
 
     // Define transform function dependencies: a -> b, b -> c, c -> a
-    List<TransformConfig> transformConfigs = new ArrayList<>();
-    transformConfigs.add(new TransformConfig("a", "plus(b,10)"));
-    transformConfigs.add(new TransformConfig("b", "plus(c,10)"));
-    transformConfigs.add(new TransformConfig("c", "plus(a,10)"));
-
-    IngestionConfig ingestionConfig = new IngestionConfig(null, null, null, 
transformConfigs, null, null);
+    List<TransformConfig> transformConfigs = Arrays.asList(
+        new TransformConfig("a", "plus(b,10)"),
+        new TransformConfig("b", "plus(c,10)"),
+        new TransformConfig("c", "plus(a,10)"));
+    IngestionConfig ingestionConfig = new IngestionConfig();
+    ingestionConfig.setTransformConfigs(transformConfigs);
     TableConfig tableConfig =
         new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testRecrusiveTransformFunctionSortOrder")
             .setIngestionConfig(ingestionConfig).build();
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerTest.java
index f479fc351b..6cc0d2bfb7 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerTest.java
@@ -87,35 +87,34 @@ public class RecordTransformerTest {
 
   @Test
   public void testFilterTransformer() {
-    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
+    IngestionConfig ingestionConfig = new IngestionConfig();
+    TableConfig tableConfig =
+        new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setIngestionConfig(ingestionConfig).build();
 
     // expression false, not filtered
+    ingestionConfig.setFilterConfig(new FilterConfig("Groovy({svInt > 123}, 
svInt)"));
     GenericRow genericRow = getRecord();
-    tableConfig.setIngestionConfig(
-        new IngestionConfig(null, null, new FilterConfig("Groovy({svInt > 
123}, svInt)"), null, null, null));
+    tableConfig.setIngestionConfig(ingestionConfig);
     RecordTransformer transformer = new FilterTransformer(tableConfig);
     transformer.transform(genericRow);
     
Assert.assertFalse(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
 
     // expression true, filtered
+    ingestionConfig.setFilterConfig(new FilterConfig("Groovy({svInt <= 123}, 
svInt)"));
     genericRow = getRecord();
-    tableConfig.setIngestionConfig(
-        new IngestionConfig(null, null, new FilterConfig("Groovy({svInt <= 
123}, svInt)"), null, null, null));
     transformer = new FilterTransformer(tableConfig);
     transformer.transform(genericRow);
     
Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
 
     // value not found
+    ingestionConfig.setFilterConfig(new FilterConfig("Groovy({notPresent == 
123}, notPresent)"));
     genericRow = getRecord();
-    tableConfig.setIngestionConfig(
-        new IngestionConfig(null, null, new FilterConfig("Groovy({notPresent 
== 123}, notPresent)"), null, null, null));
     transformer = new FilterTransformer(tableConfig);
     transformer.transform(genericRow);
     
Assert.assertFalse(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
 
     // invalid function
-    tableConfig.setIngestionConfig(
-        new IngestionConfig(null, null, new FilterConfig("Groovy(svInt == 
123)"), null, null, null));
+    ingestionConfig.setFilterConfig(new FilterConfig("Groovy(svInt == 123)"));
     try {
       new FilterTransformer(tableConfig);
       Assert.fail("Should have failed constructing FilterTransformer");
@@ -124,9 +123,8 @@ public class RecordTransformerTest {
     }
 
     // multi value column
+    ingestionConfig.setFilterConfig(new FilterConfig("Groovy({svFloat.max() < 
500}, svFloat)"));
     genericRow = getRecord();
-    tableConfig.setIngestionConfig(
-        new IngestionConfig(null, null, new 
FilterConfig("Groovy({svFloat.max() < 500}, svFloat)"), null, null, null));
     transformer = new FilterTransformer(tableConfig);
     transformer.transform(genericRow);
     
Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
@@ -181,75 +179,62 @@ public class RecordTransformerTest {
 
   @Test
   public void testScalarOps() {
-    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
+    IngestionConfig ingestionConfig = new IngestionConfig();
+    TableConfig tableConfig =
+        new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setIngestionConfig(ingestionConfig).build();
 
     // expression true, filtered
+    ingestionConfig.setFilterConfig(new FilterConfig("svInt = 123"));
     GenericRow genericRow = getRecord();
-    tableConfig.setIngestionConfig(
-        new IngestionConfig(null, null,
-            new FilterConfig("svInt = 123"), null, null, null));
     RecordTransformer transformer = new FilterTransformer(tableConfig);
     transformer.transform(genericRow);
     
Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
 
     // expression true, filtered
+    ingestionConfig.setFilterConfig(new FilterConfig("svDouble > 120"));
     genericRow = getRecord();
-    tableConfig.setIngestionConfig(
-        new IngestionConfig(null, null,
-            new FilterConfig("svDouble > 120"), null, null, null));
     transformer = new FilterTransformer(tableConfig);
     transformer.transform(genericRow);
     
Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
 
     // expression true, filtered
+    ingestionConfig.setFilterConfig(new FilterConfig("svDouble >= 123"));
     genericRow = getRecord();
-    tableConfig.setIngestionConfig(
-        new IngestionConfig(null, null,
-            new FilterConfig("svDouble >= 123"), null, null, null));
     transformer = new FilterTransformer(tableConfig);
     transformer.transform(genericRow);
     
Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
 
     // expression true, filtered
+    ingestionConfig.setFilterConfig(new FilterConfig("svDouble < 200"));
     genericRow = getRecord();
-    tableConfig.setIngestionConfig(
-        new IngestionConfig(null, null,
-            new FilterConfig("svDouble < 200"), null, null, null));
     transformer = new FilterTransformer(tableConfig);
     transformer.transform(genericRow);
     
Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
 
     // expression true, filtered
+    ingestionConfig.setFilterConfig(new FilterConfig("svDouble <= 123"));
     genericRow = getRecord();
-    tableConfig.setIngestionConfig(
-        new IngestionConfig(null, null,
-            new FilterConfig("svDouble <= 123"), null, null, null));
     transformer = new FilterTransformer(tableConfig);
     transformer.transform(genericRow);
     
Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
 
     // expression true, filtered
+    ingestionConfig.setFilterConfig(new FilterConfig("svLong != 125"));
     genericRow = getRecord();
-    tableConfig.setIngestionConfig(
-        new IngestionConfig(null, null,
-            new FilterConfig("svLong != 125"), null, null, null));
     transformer = new FilterTransformer(tableConfig);
     transformer.transform(genericRow);
     
Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
 
     // expression true, filtered
+    ingestionConfig.setFilterConfig(new FilterConfig("svLong = 123"));
     genericRow = getRecord();
-    tableConfig.setIngestionConfig(
-        new IngestionConfig(null, null,
-            new FilterConfig("svLong = 123"), null, null, null));
     transformer = new FilterTransformer(tableConfig);
     transformer.transform(genericRow);
     
Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
 
     // expression true, filtered
+    ingestionConfig.setFilterConfig(new FilterConfig("between(svLong, 100, 
125)"));
     genericRow = getRecord();
-    tableConfig.setIngestionConfig(
-        new IngestionConfig(null, null, new FilterConfig("between(svLong, 100, 
125)"), null, null, null));
     transformer = new FilterTransformer(tableConfig);
     transformer.transform(genericRow);
     
Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
@@ -267,36 +252,34 @@ public class RecordTransformerTest {
 
   @Test
   public void testObjectOps() {
-    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
+    IngestionConfig ingestionConfig = new IngestionConfig();
+    TableConfig tableConfig =
+        new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setIngestionConfig(ingestionConfig).build();
 
     // expression true, filtered
+    ingestionConfig.setFilterConfig(new FilterConfig("svNullString is null"));
     GenericRow genericRow = getNullColumnsRecord();
-    tableConfig.setIngestionConfig(
-        new IngestionConfig(null, null, new FilterConfig("svNullString is 
null"), null, null, null));
     RecordTransformer transformer = new FilterTransformer(tableConfig);
     transformer.transform(genericRow);
     
Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
 
     // expression true, filtered
+    ingestionConfig.setFilterConfig(new FilterConfig("svInt is not null"));
     genericRow = getNullColumnsRecord();
-    tableConfig.setIngestionConfig(
-        new IngestionConfig(null, null, new FilterConfig("svInt is not null"), 
null, null, null));
     transformer = new FilterTransformer(tableConfig);
     transformer.transform(genericRow);
     
Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
 
     // expression true, filtered
+    ingestionConfig.setFilterConfig(new FilterConfig("mvLong is not null"));
     genericRow = getNullColumnsRecord();
-    tableConfig.setIngestionConfig(
-        new IngestionConfig(null, null, new FilterConfig("mvLong is not 
null"), null, null, null));
     transformer = new FilterTransformer(tableConfig);
     transformer.transform(genericRow);
     
Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
 
     // expression true, filtered
+    ingestionConfig.setFilterConfig(new FilterConfig("mvNullFloat is null"));
     genericRow = getNullColumnsRecord();
-    tableConfig.setIngestionConfig(
-        new IngestionConfig(null, null, new FilterConfig("mvNullFloat is 
null"), null, null, null));
     transformer = new FilterTransformer(tableConfig);
     transformer.transform(genericRow);
     
Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
@@ -304,22 +287,20 @@ public class RecordTransformerTest {
 
   @Test
   public void testLogicalScalarOps() {
-    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
+    IngestionConfig ingestionConfig = new IngestionConfig();
+    TableConfig tableConfig =
+        new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setIngestionConfig(ingestionConfig).build();
 
     // expression true, filtered
+    ingestionConfig.setFilterConfig(new FilterConfig("svInt = 123 AND svDouble 
<= 200"));
     GenericRow genericRow = getRecord();
-    tableConfig.setIngestionConfig(
-        new IngestionConfig(null, null,
-            new FilterConfig("svInt = 123 AND svDouble <= 200"), null, null, 
null));
     RecordTransformer transformer = new FilterTransformer(tableConfig);
     transformer.transform(genericRow);
     
Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
 
     // expression true, filtered
+    ingestionConfig.setFilterConfig(new FilterConfig("svInt = 125 OR svLong <= 
200"));
     genericRow = getRecord();
-    tableConfig.setIngestionConfig(
-        new IngestionConfig(null, null,
-            new FilterConfig("svInt = 125 OR svLong <= 200"), null, null, 
null));
     transformer = new FilterTransformer(tableConfig);
     transformer.transform(genericRow);
     
Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/SegmentGenerationWithFilterRecordsTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/SegmentGenerationWithFilterRecordsTest.java
index be0462dae3..b6c495d820 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/SegmentGenerationWithFilterRecordsTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/SegmentGenerationWithFilterRecordsTest.java
@@ -67,8 +67,8 @@ public class SegmentGenerationWithFilterRecordsTest {
   public void setup() {
     String filterFunction =
         "Groovy({((col2 < 1589007600000L) &&  (col3.max() < 4)) || col1 == 
\"B\"}, col1, col2, col3)";
-    IngestionConfig ingestionConfig =
-        new IngestionConfig(null, null, new FilterConfig(filterFunction), 
null, null, null);
+    IngestionConfig ingestionConfig = new IngestionConfig();
+    ingestionConfig.setFilterConfig(new FilterConfig(filterFunction));
     _tableConfig =
         new 
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setIngestionConfig(ingestionConfig).build();
     _schema = new 
Schema.SchemaBuilder().addSingleValueDimension(STRING_COLUMN, 
FieldSpec.DataType.STRING)
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
index 8afb04880c..861743ebb0 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
@@ -590,9 +590,10 @@ public class SegmentPreProcessorTest {
   public void testV1UpdateDefaultColumns()
       throws Exception {
     constructV1Segment();
-    _tableConfig.setIngestionConfig(new IngestionConfig(null, null, null,
-        Collections.singletonList(new 
TransformConfig(NEW_INT_SV_DIMENSION_COLUMN_NAME, "plus(column1, 1)")), null,
-        null));
+    IngestionConfig ingestionConfig = new IngestionConfig();
+    ingestionConfig.setTransformConfigs(
+        Collections.singletonList(new 
TransformConfig(NEW_INT_SV_DIMENSION_COLUMN_NAME, "plus(column1, 1)")));
+    _tableConfig.setIngestionConfig(ingestionConfig);
     
_indexLoadingConfig.getInvertedIndexColumns().add(NEW_COLUMN_INVERTED_INDEX);
     checkUpdateDefaultColumns();
 
@@ -636,9 +637,10 @@ public class SegmentPreProcessorTest {
     SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(_indexDir);
     assertEquals(segmentMetadata.getVersion(), SegmentVersion.v3);
 
-    _tableConfig.setIngestionConfig(new IngestionConfig(null, null, null,
-        Collections.singletonList(new 
TransformConfig(NEW_INT_SV_DIMENSION_COLUMN_NAME, "plus(column1, 1)")), null,
-        null));
+    IngestionConfig ingestionConfig = new IngestionConfig();
+    ingestionConfig.setTransformConfigs(
+        Collections.singletonList(new 
TransformConfig(NEW_INT_SV_DIMENSION_COLUMN_NAME, "plus(column1, 1)")));
+    _tableConfig.setIngestionConfig(ingestionConfig);
     
_indexLoadingConfig.getInvertedIndexColumns().add(NEW_COLUMN_INVERTED_INDEX);
     checkUpdateDefaultColumns();
 
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/IngestionUtilsTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/IngestionUtilsTest.java
index d1c5e660ee..9b54600e93 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/IngestionUtilsTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/IngestionUtilsTest.java
@@ -22,9 +22,8 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.HashMap;
+import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import org.apache.pinot.spi.config.table.ingestion.AggregationConfig;
@@ -90,16 +89,16 @@ public class IngestionUtilsTest {
 
     // Time field spec
     // only incoming
-    schema = new Schema.SchemaBuilder()
-        .addTime(new TimeGranularitySpec(FieldSpec.DataType.LONG, 
TimeUnit.MILLISECONDS, "time"), null).build();
+    schema = new Schema.SchemaBuilder().addTime(
+        new TimeGranularitySpec(FieldSpec.DataType.LONG, 
TimeUnit.MILLISECONDS, "time"), null).build();
     extract = new ArrayList<>(IngestionUtils.getFieldsForRecordExtractor(null, 
schema));
     Assert.assertEquals(extract.size(), 1);
     Assert.assertTrue(extract.contains("time"));
 
     // incoming and outgoing different column name
-    schema = new Schema.SchemaBuilder()
-        .addTime(new TimeGranularitySpec(FieldSpec.DataType.LONG, 
TimeUnit.MILLISECONDS, "in"),
-            new TimeGranularitySpec(FieldSpec.DataType.LONG, 
TimeUnit.MILLISECONDS, "out")).build();
+    schema = new Schema.SchemaBuilder().addTime(
+        new TimeGranularitySpec(FieldSpec.DataType.LONG, 
TimeUnit.MILLISECONDS, "in"),
+        new TimeGranularitySpec(FieldSpec.DataType.LONG, 
TimeUnit.MILLISECONDS, "out")).build();
     extract = new ArrayList<>(IngestionUtils.getFieldsForRecordExtractor(null, 
schema));
     Assert.assertEquals(extract.size(), 2);
     Assert.assertTrue(extract.containsAll(Arrays.asList("in", "out")));
@@ -138,8 +137,8 @@ public class IngestionUtilsTest {
     Schema schema = new Schema();
 
     // filter config
-    IngestionConfig ingestionConfig =
-        new IngestionConfig(null, null, new FilterConfig("Groovy({x > 100}, 
x)"), null, null, null);
+    IngestionConfig ingestionConfig = new IngestionConfig();
+    ingestionConfig.setFilterConfig(new FilterConfig("Groovy({x > 100}, x)"));
     Set<String> fields = 
IngestionUtils.getFieldsForRecordExtractor(ingestionConfig, schema);
     Assert.assertEquals(fields.size(), 1);
     Assert.assertTrue(fields.containsAll(Sets.newHashSet("x")));
@@ -151,24 +150,23 @@ public class IngestionUtilsTest {
 
     // transform configs
     schema = new Schema.SchemaBuilder().addSingleValueDimension("d1", 
FieldSpec.DataType.STRING).build();
-    List<TransformConfig> transformConfigs =
-        Lists.newArrayList(new TransformConfig("d1", "Groovy({function}, 
argument1, argument2)"));
-    ingestionConfig = new IngestionConfig(null, null, null, transformConfigs, 
null, null);
+    ingestionConfig = new IngestionConfig();
+    ingestionConfig.setTransformConfigs(
+        Collections.singletonList(new TransformConfig("d1", 
"Groovy({function}, argument1, argument2)")));
     List<String> extract = new 
ArrayList<>(IngestionUtils.getFieldsForRecordExtractor(ingestionConfig, 
schema));
     Assert.assertEquals(extract.size(), 3);
     Assert.assertTrue(extract.containsAll(Arrays.asList("d1", "argument1", 
"argument2")));
 
     // groovy function, no arguments
-    transformConfigs = Lists.newArrayList(new TransformConfig("d1", 
"Groovy({function})"));
-    ingestionConfig = new IngestionConfig(null, null, null, transformConfigs, 
null, null);
+    ingestionConfig.setTransformConfigs(Collections.singletonList(new 
TransformConfig("d1", "Groovy({function})")));
     extract = new 
ArrayList<>(IngestionUtils.getFieldsForRecordExtractor(ingestionConfig, 
schema));
     Assert.assertEquals(extract.size(), 1);
     Assert.assertTrue(extract.contains("d1"));
 
     // inbuilt functions
     schema = new 
Schema.SchemaBuilder().addSingleValueDimension("hoursSinceEpoch", 
FieldSpec.DataType.LONG).build();
-    transformConfigs = Lists.newArrayList(new 
TransformConfig("hoursSinceEpoch", "toEpochHours(timestampColumn)"));
-    ingestionConfig = new IngestionConfig(null, null, null, transformConfigs, 
null, null);
+    ingestionConfig.setTransformConfigs(
+        Collections.singletonList(new TransformConfig("hoursSinceEpoch", 
"toEpochHours(timestampColumn)")));
     extract = new 
ArrayList<>(IngestionUtils.getFieldsForRecordExtractor(ingestionConfig, 
schema));
     Assert.assertEquals(extract.size(), 2);
     Assert.assertTrue(extract.containsAll(Arrays.asList("timestampColumn", 
"hoursSinceEpoch")));
@@ -176,19 +174,17 @@ public class IngestionUtilsTest {
     // inbuilt functions with literal
     schema =
         new 
Schema.SchemaBuilder().addSingleValueDimension("tenMinutesSinceEpoch", 
FieldSpec.DataType.LONG).build();
-    transformConfigs =
-        Lists.newArrayList(new TransformConfig("tenMinutesSinceEpoch", 
"toEpochMinutesBucket(timestampColumn, 10)"));
-    ingestionConfig = new IngestionConfig(null, null, null, transformConfigs, 
null, null);
+    ingestionConfig.setTransformConfigs(Collections.singletonList(
+        new TransformConfig("tenMinutesSinceEpoch", 
"toEpochMinutesBucket(timestampColumn, 10)")));
     extract = new 
ArrayList<>(IngestionUtils.getFieldsForRecordExtractor(ingestionConfig, 
schema));
     Assert.assertEquals(extract.size(), 2);
     
Assert.assertTrue(extract.containsAll(Lists.newArrayList("tenMinutesSinceEpoch",
 "timestampColumn")));
 
     // inbuilt functions on DateTimeFieldSpec
-    schema = new Schema.SchemaBuilder()
-        .addDateTime("dateColumn", FieldSpec.DataType.STRING, 
"1:DAYS:SIMPLE_DATE_FORMAT:yyyy-MM-dd", "1:DAYS").build();
-    transformConfigs =
-        Lists.newArrayList(new TransformConfig("dateColumn", 
"toDateTime(timestampColumn, 'yyyy-MM-dd')"));
-    ingestionConfig = new IngestionConfig(null, null, null, transformConfigs, 
null, null);
+    schema = new Schema.SchemaBuilder().addDateTime("dateColumn", 
FieldSpec.DataType.STRING,
+        "1:DAYS:SIMPLE_DATE_FORMAT:yyyy-MM-dd", "1:DAYS").build();
+    ingestionConfig.setTransformConfigs(
+        Collections.singletonList(new TransformConfig("dateColumn", 
"toDateTime(timestampColumn, 'yyyy-MM-dd')")));
     extract = new 
ArrayList<>(IngestionUtils.getFieldsForRecordExtractor(ingestionConfig, 
schema));
     Assert.assertEquals(extract.size(), 2);
     Assert.assertTrue(extract.containsAll(Lists.newArrayList("dateColumn", 
"timestampColumn")));
@@ -198,58 +194,41 @@ public class IngestionUtilsTest {
         .addSingleValueDimension("d2", 
FieldSpec.DataType.STRING).addMetric("m1", FieldSpec.DataType.INT)
         .addDateTime("dateColumn", FieldSpec.DataType.STRING, 
"1:DAYS:SIMPLE_DATE_FORMAT:yyyy-MM-dd", "1:DAYS").build();
     schema.getFieldSpecFor("d2").setTransformFunction("reverse(xy)");
-    transformConfigs =
-        Lists.newArrayList(new TransformConfig("dateColumn", 
"toDateTime(timestampColumn, 'yyyy-MM-dd')"));
-    ingestionConfig =
-        new IngestionConfig(null, null, new FilterConfig("Groovy({d1 == 
\"10\"}, d1)"), transformConfigs, null, null);
+    ingestionConfig.setFilterConfig(new FilterConfig("Groovy({d1 == \"10\"}, 
d1)"));
     extract = new 
ArrayList<>(IngestionUtils.getFieldsForRecordExtractor(ingestionConfig, 
schema));
     Assert.assertEquals(extract.size(), 6);
     Assert.assertTrue(extract.containsAll(Lists.newArrayList("d1", "d2", "m1", 
"dateColumn", "xy", "timestampColumn")));
 
     // filter + transform configs + schema fields  + schema transform + 
complex type configs
     schema = new Schema.SchemaBuilder().addSingleValueDimension("d1", 
FieldSpec.DataType.STRING)
-            .addSingleValueDimension("d2", FieldSpec.DataType.STRING)
-            .addMetric("m1", FieldSpec.DataType.INT)
-            .addDateTime("dateColumn", FieldSpec.DataType.STRING, 
"1:DAYS:SIMPLE_DATE_FORMAT:yyyy-MM-dd", "1:DAYS")
-            .build();
+        .addSingleValueDimension("d2", 
FieldSpec.DataType.STRING).addMetric("m1", FieldSpec.DataType.INT)
+        .addDateTime("dateColumn", FieldSpec.DataType.STRING, 
"1:DAYS:SIMPLE_DATE_FORMAT:yyyy-MM-dd", "1:DAYS").build();
     schema.getFieldSpecFor("d2").setTransformFunction("reverse(xy)");
-    TransformConfig transformConfig = new TransformConfig("dateColumn", 
"toDateTime(timestampColumn, 'yyyy-MM-dd')");
-    transformConfigs = Lists.newArrayList(transformConfig);
-    List<String> fieldsToUnnest = Arrays.asList("before.test", "after.test");
-    Map<String, String> prefixesToRename = new HashMap<>();
-    prefixesToRename.put("before", "after");
-    ComplexTypeConfig complexTypeConfigs = new 
ComplexTypeConfig(fieldsToUnnest, ".",
-            ComplexTypeConfig.CollectionNotUnnestedToJson.NON_PRIMITIVE, 
prefixesToRename);
-    FilterConfig filterConfig = new FilterConfig("Groovy({d1 == \"10\"}, d1)");
-    ingestionConfig = new IngestionConfig(null, null, filterConfig, 
transformConfigs, complexTypeConfigs, null);
+    ingestionConfig.setComplexTypeConfig(new 
ComplexTypeConfig(Arrays.asList("before.test", "after.test"), ".",
+        ComplexTypeConfig.CollectionNotUnnestedToJson.NON_PRIMITIVE, 
Collections.singletonMap("before", "after")));
     extract = new 
ArrayList<>(IngestionUtils.getFieldsForRecordExtractor(ingestionConfig, 
schema));
     Assert.assertEquals(extract.size(), 8);
     List<String> expectedColumns =
-            Lists.newArrayList("d1", "d2", "m1", "dateColumn", "xy", 
"timestampColumn", "before", "after");
+        Arrays.asList("d1", "d2", "m1", "dateColumn", "xy", "timestampColumn", 
"before", "after");
     Assert.assertTrue(extract.containsAll(expectedColumns));
   }
 
   @Test
   public void testExtractFieldsAggregationConfig() {
+    IngestionConfig ingestionConfig = new IngestionConfig();
     Schema schema = new Schema();
 
-    List<AggregationConfig> aggregationConfigs = Arrays.asList(new 
AggregationConfig("d1", "SUM(s1)"));
-    IngestionConfig ingestionConfig = new IngestionConfig(null, null, null, 
null, null, aggregationConfigs);
-
+    ingestionConfig.setAggregationConfigs(Collections.singletonList(new 
AggregationConfig("d1", "SUM(s1)")));
     Set<String> fields = 
IngestionUtils.getFieldsForRecordExtractor(ingestionConfig, schema);
     Assert.assertEquals(fields.size(), 1);
     Assert.assertTrue(fields.containsAll(Sets.newHashSet("s1")));
 
-    aggregationConfigs = Arrays.asList(new AggregationConfig("d1", "MIN(s1)"));
-    ingestionConfig = new IngestionConfig(null, null, null, null, null, 
aggregationConfigs);
-
+    ingestionConfig.setAggregationConfigs(Collections.singletonList(new 
AggregationConfig("d1", "MIN(s1)")));
     fields = IngestionUtils.getFieldsForRecordExtractor(ingestionConfig, 
schema);
     Assert.assertEquals(fields.size(), 1);
     Assert.assertTrue(fields.containsAll(Sets.newHashSet("s1")));
 
-    aggregationConfigs = Arrays.asList(new AggregationConfig("d1", "MAX(s1)"));
-    ingestionConfig = new IngestionConfig(null, null, null, null, null, 
aggregationConfigs);
-
+    ingestionConfig.setAggregationConfigs(Collections.singletonList(new 
AggregationConfig("d1", "MAX(s1)")));
     fields = IngestionUtils.getFieldsForRecordExtractor(ingestionConfig, 
schema);
     Assert.assertEquals(fields.size(), 1);
     Assert.assertTrue(fields.containsAll(Sets.newHashSet("s1")));
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
index f074f6c695..f1578b1970 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
@@ -45,7 +45,9 @@ import 
org.apache.pinot.spi.config.table.ingestion.FilterConfig;
 import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
 import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig;
 import org.apache.pinot.spi.config.table.ingestion.TransformConfig;
+import org.apache.pinot.spi.data.DimensionFieldSpec;
 import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.MetricFieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
 import org.apache.pinot.spi.stream.StreamConfigProperties;
@@ -229,31 +231,24 @@ public class TableConfigUtilsTest {
     TableConfigUtils.validate(tableConfig, schema);
 
     // null filter config, transform config
-    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
-        .setIngestionConfig(new IngestionConfig(null, null, null, null, null, 
null)).build();
+    IngestionConfig ingestionConfig = new IngestionConfig();
+    tableConfig.setIngestionConfig(ingestionConfig);
     TableConfigUtils.validate(tableConfig, schema);
 
     // null filter function
-    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
-        .setIngestionConfig(new IngestionConfig(null, null, new 
FilterConfig(null), null, null, null)).build();
+    ingestionConfig.setFilterConfig(new FilterConfig(null));
     TableConfigUtils.validate(tableConfig, schema);
 
     // valid filterFunction
-    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig(
-            new IngestionConfig(null, null, new 
FilterConfig("startsWith(columnX, \"myPrefix\")"), null, null, null))
-        .build();
+    ingestionConfig.setFilterConfig(new FilterConfig("startsWith(columnX, 
\"myPrefix\")"));
     TableConfigUtils.validate(tableConfig, schema);
 
     // valid filterFunction
-    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
-        .setIngestionConfig(new IngestionConfig(null, null, new 
FilterConfig("Groovy({x == 10}, x)"), null, null, null))
-        .build();
+    ingestionConfig.setFilterConfig(new FilterConfig("Groovy({x == 10}, x)"));
     TableConfigUtils.validate(tableConfig, schema);
 
     // invalid filter function
-    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
-        .setIngestionConfig(new IngestionConfig(null, null, new 
FilterConfig("Groovy(badExpr)"), null, null, null))
-        .build();
+    ingestionConfig.setFilterConfig(new FilterConfig("Groovy(badExpr)"));
     try {
       TableConfigUtils.validate(tableConfig, schema);
       Assert.fail("Should fail on invalid filter function string");
@@ -261,9 +256,7 @@ public class TableConfigUtilsTest {
       // expected
     }
 
-    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
-        .setIngestionConfig(new IngestionConfig(null, null, new 
FilterConfig("fakeFunction(xx)"), null, null, null))
-        .build();
+    ingestionConfig.setFilterConfig(new FilterConfig("fakeFunction(xx)"));
     try {
       TableConfigUtils.validate(tableConfig, schema);
       Assert.fail("Should fail for invalid filter function");
@@ -272,14 +265,12 @@ public class TableConfigUtilsTest {
     }
 
     // empty transform configs
-    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
-        .setIngestionConfig(new IngestionConfig(null, null, null, 
Collections.emptyList(), null, null)).build();
+    ingestionConfig.setFilterConfig(null);
+    ingestionConfig.setTransformConfigs(Collections.emptyList());
     TableConfigUtils.validate(tableConfig, schema);
 
     // transformed column not in schema
-    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig(
-        new IngestionConfig(null, null, null, Lists.newArrayList(new 
TransformConfig("myCol", "reverse(anotherCol)")),
-            null, null)).build();
+    ingestionConfig.setTransformConfigs(Collections.singletonList(new 
TransformConfig("myCol", "reverse(anotherCol)")));
     try {
       TableConfigUtils.validate(tableConfig, schema);
       Assert.fail("Should fail for transformedColumn not present in schema");
@@ -288,29 +279,26 @@ public class TableConfigUtilsTest {
     }
 
     // using a transformation column in an aggregation
-    schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
-        .addMetric("twiceSum", FieldSpec.DataType.DOUBLE).build();
-    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig(
-        new IngestionConfig(null, null, null, Lists.newArrayList(new 
TransformConfig("twice", "col * 2")),
-            null, Lists.newArrayList((new AggregationConfig("twiceSum", 
"SUM(twice)"))))).build();
+    schema =
+        new 
Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addMetric("twiceSum", 
FieldSpec.DataType.DOUBLE).build();
+    ingestionConfig.setTransformConfigs(Collections.singletonList(new 
TransformConfig("twice", "col * 2")));
+    ingestionConfig.setAggregationConfigs(Collections.singletonList(new 
AggregationConfig("twiceSum", "SUM(twice)")));
     TableConfigUtils.validate(tableConfig, schema);
 
+    // valid transform configs
     schema =
         new 
Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension("myCol",
 FieldSpec.DataType.STRING)
             .build();
-    // valid transform configs
-    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig(
-        new IngestionConfig(null, null, null, Lists.newArrayList(new 
TransformConfig("myCol", "reverse(anotherCol)")),
-            null, null)).build();
+    ingestionConfig.setAggregationConfigs(null);
+    ingestionConfig.setTransformConfigs(Collections.singletonList(new 
TransformConfig("myCol", "reverse(anotherCol)")));
     TableConfigUtils.validate(tableConfig, schema);
 
+    // valid transform configs
     schema =
         new 
Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension("myCol",
 FieldSpec.DataType.STRING)
             .addMetric("transformedCol", FieldSpec.DataType.LONG).build();
-    // valid transform configs
-    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig(
-        new IngestionConfig(null, null, null, Lists.newArrayList(new 
TransformConfig("myCol", "reverse(anotherCol)"),
-            new TransformConfig("transformedCol", "Groovy({x+y}, x, y)")), 
null, null)).build();
+    ingestionConfig.setTransformConfigs(Arrays.asList(new 
TransformConfig("myCol", "reverse(anotherCol)"),
+        new TransformConfig("transformedCol", "Groovy({x+y}, x, y)")));
     TableConfigUtils.validate(tableConfig, schema);
 
     // invalid transform config since Groovy is disabled
@@ -322,9 +310,8 @@ public class TableConfigUtilsTest {
     }
 
     // invalid filter config since Groovy is disabled
-    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig(
-            new IngestionConfig(null, null, new 
FilterConfig("Groovy({timestamp > 0}, timestamp)"), null, null, null))
-        .build();
+    ingestionConfig.setTransformConfigs(null);
+    ingestionConfig.setFilterConfig(new FilterConfig("Groovy({timestamp > 0}, 
timestamp)"));
     try {
       TableConfigUtils.validate(tableConfig, schema, null, true);
       Assert.fail("Should fail when Groovy functions disabled but found in 
filter config");
@@ -333,9 +320,8 @@ public class TableConfigUtilsTest {
     }
 
     // null transform column name
-    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig(
-        new IngestionConfig(null, null, null, Lists.newArrayList(new 
TransformConfig(null, "reverse(anotherCol)")),
-            null, null)).build();
+    ingestionConfig.setFilterConfig(null);
+    ingestionConfig.setTransformConfigs(Collections.singletonList(new 
TransformConfig(null, "reverse(anotherCol)")));
     try {
       TableConfigUtils.validate(tableConfig, schema);
       Assert.fail("Should fail for null column name in transform config");
@@ -344,9 +330,7 @@ public class TableConfigUtilsTest {
     }
 
     // null transform function string
-    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig(
-            new IngestionConfig(null, null, null, Lists.newArrayList(new 
TransformConfig("myCol", null)), null, null))
-        .build();
+    ingestionConfig.setTransformConfigs(Collections.singletonList(new 
TransformConfig("myCol", null)));
     try {
       TableConfigUtils.validate(tableConfig, schema);
       Assert.fail("Should fail for null transform function in transform 
config");
@@ -355,9 +339,7 @@ public class TableConfigUtilsTest {
     }
 
     // invalid function
-    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig(
-        new IngestionConfig(null, null, null, Lists.newArrayList(new 
TransformConfig("myCol", "fakeFunction(col)")),
-            null, null)).build();
+    ingestionConfig.setTransformConfigs(Collections.singletonList(new 
TransformConfig("myCol", "fakeFunction(col)")));
     try {
       TableConfigUtils.validate(tableConfig, schema);
       Assert.fail("Should fail for invalid transform function in transform 
config");
@@ -366,9 +348,7 @@ public class TableConfigUtilsTest {
     }
 
     // invalid function
-    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig(
-        new IngestionConfig(null, null, null, Lists.newArrayList(new 
TransformConfig("myCol", "Groovy(badExpr)")), null,
-            null)).build();
+    ingestionConfig.setTransformConfigs(Collections.singletonList(new 
TransformConfig("myCol", "Groovy(badExpr)")));
     try {
       TableConfigUtils.validate(tableConfig, schema);
       Assert.fail("Should fail for invalid transform function in transform 
config");
@@ -377,9 +357,7 @@ public class TableConfigUtilsTest {
     }
 
     // input field name used as destination field
-    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig(
-        new IngestionConfig(null, null, null, Lists.newArrayList(new 
TransformConfig("myCol", "reverse(myCol)")), null,
-            null)).build();
+    ingestionConfig.setTransformConfigs(Collections.singletonList(new 
TransformConfig("myCol", "reverse(myCol)")));
     try {
       TableConfigUtils.validate(tableConfig, schema);
       Assert.fail("Should fail due to use of myCol as arguments and 
columnName");
@@ -388,10 +366,8 @@ public class TableConfigUtilsTest {
     }
 
     // input field name used as destination field
-    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig(
-            new IngestionConfig(null, null, null,
-                Lists.newArrayList(new TransformConfig("myCol", "Groovy({x + y 
+ myCol}, x, myCol, y)")), null, null))
-        .build();
+    ingestionConfig.setTransformConfigs(
+        Collections.singletonList(new TransformConfig("myCol", "Groovy({x + y 
+ myCol}, x, myCol, y)")));
     try {
       TableConfigUtils.validate(tableConfig, schema);
       Assert.fail("Should fail due to use of myCol as arguments and 
columnName");
@@ -400,10 +376,8 @@ public class TableConfigUtilsTest {
     }
 
     // duplicate transform config
-    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig(
-        new IngestionConfig(null, null, null,
-            Lists.newArrayList(new TransformConfig("myCol", "reverse(x)"), new 
TransformConfig("myCol", "lower(y)")),
-            null, null)).build();
+    ingestionConfig.setTransformConfigs(
+        Arrays.asList(new TransformConfig("myCol", "reverse(x)"), new 
TransformConfig("myCol", "lower(y)")));
     try {
       TableConfigUtils.validate(tableConfig, schema);
       Assert.fail("Should fail due to duplicate transform config");
@@ -412,19 +386,16 @@ public class TableConfigUtilsTest {
     }
 
     // derived columns - should pass
-    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig(
-        new IngestionConfig(null, null, null, Lists.newArrayList(new 
TransformConfig("transformedCol", "reverse(x)"),
-            new TransformConfig("myCol", "lower(transformedCol)")), null, 
null)).build();
+    ingestionConfig.setTransformConfigs(Arrays.asList(new 
TransformConfig("transformedCol", "reverse(x)"),
+        new TransformConfig("myCol", "lower(transformedCol)")));
     TableConfigUtils.validate(tableConfig, schema);
 
     // invalid field name in schema with matching prefix from 
complexConfigType's prefixesToRename
-    HashMap<String, String> prefixesToRename = new HashMap<>();
-    prefixesToRename.put("after.", "");
-    ComplexTypeConfig complexConfig = new ComplexTypeConfig(null, ".", null, 
prefixesToRename);
-    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
-        .setIngestionConfig(new IngestionConfig(null, null, null, null, 
complexConfig, null)).build();
+    ingestionConfig.setTransformConfigs(null);
+    ingestionConfig.setComplexTypeConfig(
+        new ComplexTypeConfig(null, ".", null, 
Collections.singletonMap("after.", "")));
     schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
-            .addMultiValueDimension("after.test", 
FieldSpec.DataType.STRING).build();
+        .addMultiValueDimension("after.test", 
FieldSpec.DataType.STRING).build();
     try {
       TableConfigUtils.validate(tableConfig, schema);
       Assert.fail("Should fail due to name conflict from field name in schema 
with a prefix in prefixesToRename");
@@ -435,44 +406,30 @@ public class TableConfigUtilsTest {
 
   @Test
   public void ingestionAggregationConfigsTest() {
-    List<AggregationConfig> aggregationConfigs = Arrays.asList(new 
AggregationConfig("d1", "SUM(s1)"));
-    IngestionConfig ingestionConfig =
-        new IngestionConfig(null, null, null, null, null, aggregationConfigs);
-
-    Schema schema =
-        new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addMetric("d1", 
FieldSpec.DataType.DOUBLE).build();
+    Schema schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
+        .addDateTime("timeColumn", FieldSpec.DataType.TIMESTAMP, 
"1:MILLISECONDS:EPOCH", "1:MILLISECONDS").build();
+    IngestionConfig ingestionConfig = new IngestionConfig();
+    ingestionConfig.setAggregationConfigs(Collections.singletonList(new 
AggregationConfig("d1", "SUM(s1)")));
     TableConfig tableConfig =
-        new 
TableConfigBuilder(TableType.REALTIME).setTableName("myTable_REALTIME").setTimeColumnName("timeColumn")
-            
.setIngestionConfig(ingestionConfig).setAggregateMetrics(true).build();
-
+        new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName("timeColumn")
+            .setIngestionConfig(ingestionConfig).build();
     try {
       TableConfigUtils.validateIngestionConfig(tableConfig, schema);
-      Assert.fail("Should fail due to aggregateMetrics being set");
+      Assert.fail("Should fail due to destination column not being in schema");
     } catch (IllegalStateException e) {
       // expected
     }
 
-    schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).build();
-    tableConfig =
-        new 
TableConfigBuilder(TableType.REALTIME).setTableName("myTable_REALTIME").setTimeColumnName("timeColumn")
-            .setIngestionConfig(ingestionConfig).build();
+    schema.addField(new DimensionFieldSpec("d1", FieldSpec.DataType.DOUBLE, 
true));
+    tableConfig.getIndexingConfig().setAggregateMetrics(true);
     try {
       TableConfigUtils.validateIngestionConfig(tableConfig, schema);
-      Assert.fail("Should fail due to destination column not being in schema");
+      Assert.fail("Should fail due to aggregateMetrics being set");
     } catch (IllegalStateException e) {
       // expected
     }
 
-    schema =
-        new 
Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension("d1", 
FieldSpec.DataType.DOUBLE)
-            .build();
-    aggregationConfigs = Arrays.asList(new AggregationConfig("d1", "SUM(s1)"));
-    ingestionConfig =
-        new IngestionConfig(null, null, null, null, null, aggregationConfigs);
-    tableConfig =
-        new 
TableConfigBuilder(TableType.REALTIME).setTableName("myTable_REALTIME").setTimeColumnName("timeColumn")
-            .setIngestionConfig(ingestionConfig).build();
-
+    tableConfig.getIndexingConfig().setAggregateMetrics(false);
     try {
       TableConfigUtils.validateIngestionConfig(tableConfig, schema);
       Assert.fail("Should fail due to aggregation column being a dimension");
@@ -480,14 +437,8 @@ public class TableConfigUtilsTest {
       // expected
     }
 
-    schema = new 
Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addMetric("d1", 
FieldSpec.DataType.DOUBLE).build();
-    aggregationConfigs = Arrays.asList(new AggregationConfig(null, null));
-    ingestionConfig =
-        new IngestionConfig(null, null, null, null, null, aggregationConfigs);
-    tableConfig =
-        new 
TableConfigBuilder(TableType.REALTIME).setTableName("myTable_REALTIME").setTimeColumnName("timeColumn")
-            .setIngestionConfig(ingestionConfig).build();
-
+    schema.addField(new MetricFieldSpec("m1", FieldSpec.DataType.DOUBLE));
+    ingestionConfig.setAggregationConfigs(Collections.singletonList(new 
AggregationConfig(null, null)));
     try {
       TableConfigUtils.validateIngestionConfig(tableConfig, schema);
       Assert.fail("Should fail due to null columnName/aggregationFunction");
@@ -495,13 +446,8 @@ public class TableConfigUtilsTest {
       // expected
     }
 
-    aggregationConfigs = Arrays.asList(new AggregationConfig("d1", "SUM(s1)"), 
new AggregationConfig("d1", "SUM(s2)"));
-    ingestionConfig =
-        new IngestionConfig(null, null, null, null, null, aggregationConfigs);
-    tableConfig =
-        new 
TableConfigBuilder(TableType.REALTIME).setTableName("myTable_REALTIME").setTimeColumnName("timeColumn")
-            .setIngestionConfig(ingestionConfig).build();
-
+    ingestionConfig.setAggregationConfigs(
+        Arrays.asList(new AggregationConfig("m1", "SUM(s1)"), new 
AggregationConfig("m1", "SUM(s2)")));
     try {
       TableConfigUtils.validateIngestionConfig(tableConfig, schema);
       Assert.fail("Should fail due to duplicate destination column");
@@ -509,13 +455,7 @@ public class TableConfigUtilsTest {
       // expected
     }
 
-    aggregationConfigs = Arrays.asList(new AggregationConfig("d1", "SUM s1"));
-    ingestionConfig =
-        new IngestionConfig(null, null, null, null, null, aggregationConfigs);
-    tableConfig =
-        new 
TableConfigBuilder(TableType.REALTIME).setTableName("myTable_REALTIME").setTimeColumnName("timeColumn")
-            .setIngestionConfig(ingestionConfig).build();
-
+    ingestionConfig.setAggregationConfigs(Collections.singletonList(new 
AggregationConfig("m1", "SUM s1")));
     try {
       TableConfigUtils.validateIngestionConfig(tableConfig, schema);
       Assert.fail("Should fail due to invalid aggregation function");
@@ -523,13 +463,8 @@ public class TableConfigUtilsTest {
       // expected
     }
 
-    aggregationConfigs = Arrays.asList(new AggregationConfig("d1", 
"DISTINCTCOUNTHLL(s1)"));
-    ingestionConfig =
-        new IngestionConfig(null, null, null, null, null, aggregationConfigs);
-    tableConfig =
-        new 
TableConfigBuilder(TableType.REALTIME).setTableName("myTable_REALTIME").setTimeColumnName("timeColumn")
-            .setIngestionConfig(ingestionConfig).build();
-
+    ingestionConfig.setAggregationConfigs(
+        Collections.singletonList(new AggregationConfig("m1", 
"DISTINCTCOUNTHLL(s1)")));
     try {
       TableConfigUtils.validateIngestionConfig(tableConfig, schema);
       Assert.fail("Should fail due to not supported aggregation function");
@@ -537,13 +472,7 @@ public class TableConfigUtilsTest {
       // expected
     }
 
-    aggregationConfigs = Arrays.asList(new AggregationConfig("d1", "s1 + s2"));
-    ingestionConfig =
-        new IngestionConfig(null, null, null, null, null, aggregationConfigs);
-    tableConfig =
-        new 
TableConfigBuilder(TableType.REALTIME).setTableName("myTable_REALTIME").setTimeColumnName("timeColumn")
-            .setIngestionConfig(ingestionConfig).build();
-
+    ingestionConfig.setAggregationConfigs(Collections.singletonList(new 
AggregationConfig("m1", "s1 + s2")));
     try {
       TableConfigUtils.validateIngestionConfig(tableConfig, schema);
       Assert.fail("Should fail due to multiple arguments");
@@ -551,13 +480,7 @@ public class TableConfigUtilsTest {
       // expected
     }
 
-    aggregationConfigs = Arrays.asList(new AggregationConfig("d1", "SUM(s1 - 
s2)"));
-    ingestionConfig =
-        new IngestionConfig(null, null, null, null, null, aggregationConfigs);
-    tableConfig =
-        new 
TableConfigBuilder(TableType.REALTIME).setTableName("myTable_REALTIME").setTimeColumnName("timeColumn")
-            .setIngestionConfig(ingestionConfig).build();
-
+    ingestionConfig.setAggregationConfigs(Collections.singletonList(new 
AggregationConfig("m1", "SUM(s1 - s2)")));
     try {
       TableConfigUtils.validateIngestionConfig(tableConfig, schema);
       Assert.fail("Should fail due to inner value not being a column");
@@ -565,51 +488,28 @@ public class TableConfigUtilsTest {
       // expected
     }
 
-    schema = new 
Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addMetric("d1", 
FieldSpec.DataType.DOUBLE).build();
-    aggregationConfigs = Arrays.asList(new AggregationConfig("d1", "SUM(d1)"));
-    ingestionConfig =
-        new IngestionConfig(null, null, null, null, null, aggregationConfigs);
-    tableConfig =
-        new 
TableConfigBuilder(TableType.REALTIME).setTableName("myTable_REALTIME").setTimeColumnName("timeColumn")
-            .setIngestionConfig(ingestionConfig).build();
-
+    ingestionConfig.setAggregationConfigs(Collections.singletonList(new 
AggregationConfig("m1", "SUM(m1)")));
     TableConfigUtils.validateIngestionConfig(tableConfig, schema);
 
-    schema = new 
Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addMetric("d1", 
FieldSpec.DataType.DOUBLE)
-        .addMetric("d2", FieldSpec.DataType.DOUBLE).build();
-    aggregationConfigs = Arrays.asList(new AggregationConfig("d1", "SUM(s1)"));
-    ingestionConfig =
-        new IngestionConfig(null, null, null, null, null, aggregationConfigs);
-    tableConfig =
-        new 
TableConfigBuilder(TableType.REALTIME).setTableName("myTable_REALTIME").setTimeColumnName("timeColumn")
-            .setIngestionConfig(ingestionConfig).build();
+    ingestionConfig.setAggregationConfigs(Collections.singletonList(new 
AggregationConfig("m1", "SUM(s1)")));
+    TableConfigUtils.validateIngestionConfig(tableConfig, schema);
 
+    schema.addField(new MetricFieldSpec("m2", FieldSpec.DataType.DOUBLE));
     try {
       TableConfigUtils.validateIngestionConfig(tableConfig, schema);
       Assert.fail("Should fail due to one metric column not being aggregated");
     } catch (IllegalStateException e) {
       // expected
     }
-
-
-    schema = new 
Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addMetric("d1", 
FieldSpec.DataType.DOUBLE).build();
-    aggregationConfigs = Arrays.asList(new AggregationConfig("d1", "SUM(s1)"));
-    ingestionConfig =
-        new IngestionConfig(null, null, null, null, null, aggregationConfigs);
-    tableConfig =
-        new 
TableConfigBuilder(TableType.REALTIME).setTableName("myTable_REALTIME").setTimeColumnName("timeColumn")
-            .setIngestionConfig(ingestionConfig).build();
-    TableConfigUtils.validateIngestionConfig(tableConfig, schema);
   }
 
   @Test
   public void ingestionStreamConfigsTest() {
     Map<String, String> streamConfigs = getStreamConfigs();
-    IngestionConfig ingestionConfig =
-        new IngestionConfig(null, new 
StreamIngestionConfig(Lists.newArrayList(streamConfigs, streamConfigs)), null,
-            null, null, null);
+    IngestionConfig ingestionConfig = new IngestionConfig();
+    ingestionConfig.setStreamIngestionConfig(new 
StreamIngestionConfig(Arrays.asList(streamConfigs, streamConfigs)));
     TableConfig tableConfig =
-        new 
TableConfigBuilder(TableType.REALTIME).setTableName("myTable_REALTIME").setTimeColumnName("timeColumn")
+        new 
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName("timeColumn")
             .setIngestionConfig(ingestionConfig).build();
 
     // only 1 stream config allowed
@@ -621,9 +521,7 @@ public class TableConfigUtilsTest {
     }
 
     // stream config should be valid
-    ingestionConfig =
-        new IngestionConfig(null, new 
StreamIngestionConfig(Lists.newArrayList(streamConfigs)), null, null, null, 
null);
-    tableConfig.setIngestionConfig(ingestionConfig);
+    ingestionConfig.setStreamIngestionConfig(new 
StreamIngestionConfig(Collections.singletonList(streamConfigs)));
     TableConfigUtils.validateIngestionConfig(tableConfig, null);
 
     streamConfigs.remove(StreamConfigProperties.STREAM_TYPE);
@@ -645,11 +543,12 @@ public class TableConfigUtilsTest {
     batchConfigMap.put(BatchConfigProperties.INPUT_FORMAT, "avro");
     batchConfigMap.put(BatchConfigProperties.RECORD_READER_CLASS, 
"org.foo.Reader");
 
-    IngestionConfig ingestionConfig =
-        new IngestionConfig(new 
BatchIngestionConfig(Lists.newArrayList(batchConfigMap, batchConfigMap), null, 
null),
-            null, null, null, null, null);
+    IngestionConfig ingestionConfig = new IngestionConfig();
+    // TODO: Check if we should allow duplicate config maps
+    ingestionConfig.setBatchIngestionConfig(
+        new BatchIngestionConfig(Arrays.asList(batchConfigMap, 
batchConfigMap), null, null));
     TableConfig tableConfig =
-        new 
TableConfigBuilder(TableType.OFFLINE).setTableName("myTable_OFFLINE").setIngestionConfig(ingestionConfig)
+        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig(ingestionConfig)
             .build();
     TableConfigUtils.validateIngestionConfig(tableConfig, null);
   }
@@ -665,15 +564,15 @@ public class TableConfigUtilsTest {
     batchConfigMap.put(BatchConfigProperties.RECORD_READER_CLASS, 
"org.foo.Reader");
 
     // valid dimension table ingestion config
+    IngestionConfig ingestionConfig = new IngestionConfig();
+    ingestionConfig.setBatchIngestionConfig(
+        new BatchIngestionConfig(Collections.singletonList(batchConfigMap), 
"REFRESH", null));
     TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIsDimTable(true)
-        .setIngestionConfig(new IngestionConfig(
-            new BatchIngestionConfig(Lists.newArrayList(batchConfigMap, 
batchConfigMap), "REFRESH", null), null, null,
-            null, null, null)).build();
+        .setIngestionConfig(ingestionConfig).build();
     TableConfigUtils.validateIngestionConfig(tableConfig, null);
 
     // dimension tables should have batch ingestion config
-    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIsDimTable(true)
-        .setIngestionConfig(new IngestionConfig(null, null, null, null, null, 
null)).build();
+    ingestionConfig.setBatchIngestionConfig(null);
     try {
       TableConfigUtils.validateIngestionConfig(tableConfig, null);
       Assert.fail("Should fail for Dimension table without batch ingestion 
config");
@@ -682,10 +581,8 @@ public class TableConfigUtilsTest {
     }
 
     // dimension tables should have batch ingestion config of type REFRESH
-    tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIsDimTable(true)
-        .setIngestionConfig(new IngestionConfig(
-            new BatchIngestionConfig(Lists.newArrayList(batchConfigMap, 
batchConfigMap), "APPEND", null), null, null,
-            null, null, null)).build();
+    ingestionConfig.setBatchIngestionConfig(
+        new BatchIngestionConfig(Collections.singletonList(batchConfigMap), 
"APPEND", null));
     try {
       TableConfigUtils.validateIngestionConfig(tableConfig, null);
       Assert.fail("Should fail for Dimension table with ingestion type APPEND 
(should be REFRESH)");
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/BatchIngestionConfig.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/BatchIngestionConfig.java
index ba604f8b5e..2e7c475ff9 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/BatchIngestionConfig.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/BatchIngestionConfig.java
@@ -33,7 +33,7 @@ import org.apache.pinot.spi.config.BaseJsonConfig;
 public class BatchIngestionConfig extends BaseJsonConfig {
 
   @JsonPropertyDescription("Configs for all the batch sources to ingest from")
-  private final List<Map<String, String>> _batchConfigMaps;
+  private List<Map<String, String>> _batchConfigMaps;
 
   @JsonPropertyDescription("Ingestion type APPEND or REFRESH")
   private String _segmentIngestionType;
@@ -63,6 +63,10 @@ public class BatchIngestionConfig extends BaseJsonConfig {
     return _segmentIngestionFrequency;
   }
 
+  public void setBatchConfigMaps(List<Map<String, String>> batchConfigMaps) {
+    _batchConfigMaps = batchConfigMaps;
+  }
+
   public void setSegmentIngestionType(String segmentIngestionType) {
     _segmentIngestionType = segmentIngestionType;
   }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/IngestionConfig.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/IngestionConfig.java
index 6b8329410a..ba471d899b 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/IngestionConfig.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/IngestionConfig.java
@@ -18,8 +18,6 @@
  */
 package org.apache.pinot.spi.config.table.ingestion;
 
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonPropertyDescription;
 import java.util.List;
 import javax.annotation.Nullable;
@@ -50,14 +48,11 @@ public class IngestionConfig extends BaseJsonConfig {
   @JsonPropertyDescription("Configs related to record aggregation function 
applied during ingestion")
   private List<AggregationConfig> _aggregationConfigs;
 
-
-  @JsonCreator
-  public IngestionConfig(@JsonProperty("batchIngestionConfig") @Nullable 
BatchIngestionConfig batchIngestionConfig,
-      @JsonProperty("streamIngestionConfig") @Nullable StreamIngestionConfig 
streamIngestionConfig,
-      @JsonProperty("filterConfig") @Nullable FilterConfig filterConfig,
-      @JsonProperty("transformConfigs") @Nullable List<TransformConfig> 
transformConfigs,
-      @JsonProperty("complexTypeConfig") @Nullable ComplexTypeConfig 
complexTypeConfig,
-      @JsonProperty("aggregationConfigs") @Nullable List<AggregationConfig> 
aggregationConfigs) {
+  @Deprecated
+  public IngestionConfig(@Nullable BatchIngestionConfig batchIngestionConfig,
+      @Nullable StreamIngestionConfig streamIngestionConfig, @Nullable 
FilterConfig filterConfig,
+      @Nullable List<TransformConfig> transformConfigs, @Nullable 
ComplexTypeConfig complexTypeConfig,
+      @Nullable List<AggregationConfig> aggregationConfigs) {
     _batchIngestionConfig = batchIngestionConfig;
     _streamIngestionConfig = streamIngestionConfig;
     _filterConfig = filterConfig;
diff --git 
a/pinot-spi/src/test/java/org/apache/pinot/spi/utils/IngestionConfigUtilsTest.java
 
b/pinot-spi/src/test/java/org/apache/pinot/spi/utils/IngestionConfigUtilsTest.java
index e9831ea622..f50f45746f 100644
--- 
a/pinot-spi/src/test/java/org/apache/pinot/spi/utils/IngestionConfigUtilsTest.java
+++ 
b/pinot-spi/src/test/java/org/apache/pinot/spi/utils/IngestionConfigUtilsTest.java
@@ -19,7 +19,8 @@
 package org.apache.pinot.spi.utils;
 
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import org.apache.pinot.spi.config.table.IndexingConfig;
@@ -53,11 +54,10 @@ public class IngestionConfigUtilsTest {
         new 
TableConfigBuilder(TableType.REALTIME).setTableName("myTable").setTimeColumnName("timeColumn").build();
 
     // get from ingestion config (when not present in indexing config)
-    Map<String, String> streamConfigMap = new HashMap<>();
-    streamConfigMap.put("streamType", "kafka");
-    tableConfig.setIngestionConfig(
-        new IngestionConfig(null, new 
StreamIngestionConfig(Lists.newArrayList(streamConfigMap)), null, null, null,
-            null));
+    Map<String, String> streamConfigMap = 
Collections.singletonMap("streamType", "kafka");
+    IngestionConfig ingestionConfig = new IngestionConfig();
+    ingestionConfig.setStreamIngestionConfig(new 
StreamIngestionConfig(Collections.singletonList(streamConfigMap)));
+    tableConfig.setIngestionConfig(ingestionConfig);
     Map<String, String> actualStreamConfigsMap = 
IngestionConfigUtils.getStreamConfigMap(tableConfig);
     Assert.assertEquals(actualStreamConfigsMap.size(), 1);
     Assert.assertEquals(actualStreamConfigsMap.get("streamType"), "kafka");
@@ -74,9 +74,8 @@ public class IngestionConfigUtilsTest {
     Assert.assertEquals(actualStreamConfigsMap.get("streamType"), "kafka");
 
     // fail if multiple found
-    tableConfig.setIngestionConfig(new IngestionConfig(null,
-        new StreamIngestionConfig(Lists.newArrayList(streamConfigMap, 
deprecatedStreamConfigMap)), null, null, null,
-        null));
+    ingestionConfig.setStreamIngestionConfig(
+        new StreamIngestionConfig(Arrays.asList(streamConfigMap, 
deprecatedStreamConfigMap)));
     try {
       IngestionConfigUtils.getStreamConfigMap(tableConfig);
       Assert.fail("Should fail for multiple stream configs");
@@ -103,9 +102,10 @@ public class IngestionConfigUtilsTest {
   @Test
   public void testGetPushFrequency() {
     // get from ingestion config, when not present in segmentsConfig
-    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("myTable").build();
-    tableConfig.setIngestionConfig(
-        new IngestionConfig(new BatchIngestionConfig(null, "APPEND", 
"HOURLY"), null, null, null, null, null));
+    IngestionConfig ingestionConfig = new IngestionConfig();
+    ingestionConfig.setBatchIngestionConfig(new BatchIngestionConfig(null, 
"APPEND", "HOURLY"));
+    TableConfig tableConfig =
+        new 
TableConfigBuilder(TableType.OFFLINE).setTableName("myTable").setIngestionConfig(ingestionConfig).build();
     
Assert.assertEquals(IngestionConfigUtils.getBatchSegmentIngestionFrequency(tableConfig),
 "HOURLY");
 
     // get from ingestion config, even if present in segmentsConfig
@@ -128,9 +128,10 @@ public class IngestionConfigUtilsTest {
   @Test
   public void testGetPushType() {
     // get from ingestion config, when not present in segmentsConfig
-    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName("myTable").build();
-    tableConfig.setIngestionConfig(
-        new IngestionConfig(new BatchIngestionConfig(null, "APPEND", 
"HOURLY"), null, null, null, null, null));
+    IngestionConfig ingestionConfig = new IngestionConfig();
+    ingestionConfig.setBatchIngestionConfig(new BatchIngestionConfig(null, 
"APPEND", "HOURLY"));
+    TableConfig tableConfig =
+        new 
TableConfigBuilder(TableType.OFFLINE).setTableName("myTable").setIngestionConfig(ingestionConfig).build();
     
Assert.assertEquals(IngestionConfigUtils.getBatchSegmentIngestionType(tableConfig),
 "APPEND");
 
     // get from ingestion config, even if present in segmentsConfig


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to