This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 36298f0aa4 [Clean up] Simplify IngestionConfig construction (#8743)
36298f0aa4 is described below
commit 36298f0aa4d3d8e2b8d30768cd8cef5ee4edec7a
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Wed May 25 10:51:10 2022 -0700
[Clean up] Simplify IngestionConfig construction (#8743)
---
.../common/utils/config/TableConfigUtils.java | 25 +-
.../common/utils/config/TableConfigSerDeTest.java | 35 ++-
.../connector/flink/http/PinotConnectionUtils.java | 46 ++--
.../flink/sink/PinotSinkIntegrationTest.java | 6 +-
.../pinot/controller/util/FileIngestionHelper.java | 8 +-
.../helix/core/PinotHelixResourceManagerTest.java | 6 +-
.../core/retention/SegmentLineageCleanupTest.java | 4 +-
.../apache/pinot/core/util/SchemaUtilsTest.java | 11 +-
.../IngestionConfigHybridIntegrationTest.java | 26 +-
.../tests/JsonPathClusterIntegrationTest.java | 10 +-
.../tests/MapTypeClusterIntegrationTest.java | 10 +-
.../tests/OfflineClusterIntegrationTest.java | 11 +-
.../SegmentWriterUploaderIntegrationTest.java | 8 +-
.../preprocess/DataPreprocessingHelperTest.java | 4 +-
.../mergerollup/MergeRollupTaskGeneratorTest.java | 4 +-
.../RealtimeToOfflineSegmentsTaskExecutorTest.java | 13 +-
.../filebased/FileBasedSegmentWriterTest.java | 100 ++++----
.../ExpressionTransformerTest.java | 95 ++++----
.../recordtransformer/RecordTransformerTest.java | 83 +++----
.../SegmentGenerationWithFilterRecordsTest.java | 4 +-
.../index/loader/SegmentPreProcessorTest.java | 14 +-
.../segment/local/utils/IngestionUtilsTest.java | 81 +++----
.../segment/local/utils/TableConfigUtilsTest.java | 263 +++++++--------------
.../table/ingestion/BatchIngestionConfig.java | 6 +-
.../config/table/ingestion/IngestionConfig.java | 15 +-
.../pinot/spi/utils/IngestionConfigUtilsTest.java | 31 +--
26 files changed, 384 insertions(+), 535 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TableConfigUtils.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TableConfigUtils.java
index e515249761..f429cdf7b3 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TableConfigUtils.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TableConfigUtils.java
@@ -26,6 +26,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.commons.collections.MapUtils;
import org.apache.helix.ZNRecord;
import org.apache.pinot.spi.config.table.FieldConfig;
import org.apache.pinot.spi.config.table.IndexingConfig;
@@ -218,8 +219,7 @@ public class TableConfigUtils {
}
/**
- * Helper method to convert from legacy/deprecated configs into current
version
- * of TableConfig.
+ * Helper method to convert from legacy/deprecated configs into current
version of TableConfig.
* <ul>
* <li>Moves deprecated ingestion related configs into Ingestion
Config.</li>
* <li>The conversion happens in-place, the specified tableConfig is
mutated in-place.</li>
@@ -247,7 +247,6 @@ public class TableConfigUtils {
if (batchIngestionConfig.getSegmentIngestionType() == null) {
batchIngestionConfig.setSegmentIngestionType(segmentPushType);
}
-
if (batchIngestionConfig.getSegmentIngestionFrequency() == null) {
batchIngestionConfig.setSegmentIngestionFrequency(segmentPushFrequency);
}
@@ -258,32 +257,26 @@ public class TableConfigUtils {
IndexingConfig indexingConfig = tableConfig.getIndexingConfig();
if (streamIngestionConfig == null) {
- Map<String, String> streamConfigs = indexingConfig.getStreamConfigs();
-
// Only set the new config if the deprecated one is set.
- if (streamConfigs != null && !streamConfigs.isEmpty()) {
+ Map<String, String> streamConfigs = indexingConfig.getStreamConfigs();
+ if (MapUtils.isNotEmpty(streamConfigs)) {
streamIngestionConfig = new
StreamIngestionConfig(Collections.singletonList(streamConfigs));
}
}
if (ingestionConfig == null) {
if (batchIngestionConfig != null || streamIngestionConfig != null) {
- ingestionConfig = new IngestionConfig(batchIngestionConfig,
streamIngestionConfig, null, null, null, null);
- }
- } else {
- if (batchIngestionConfig != null) {
+ ingestionConfig = new IngestionConfig();
ingestionConfig.setBatchIngestionConfig(batchIngestionConfig);
- }
-
- if (streamIngestionConfig != null) {
ingestionConfig.setStreamIngestionConfig(streamIngestionConfig);
}
+ } else {
+ ingestionConfig.setBatchIngestionConfig(batchIngestionConfig);
+ ingestionConfig.setStreamIngestionConfig(streamIngestionConfig);
}
// Set the new config fields.
- if (ingestionConfig != null) {
- tableConfig.setIngestionConfig(ingestionConfig);
- }
+ tableConfig.setIngestionConfig(ingestionConfig);
// Clear the deprecated ones.
indexingConfig.setStreamConfigs(null);
diff --git
a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
index 05f12d04a0..bdbd41b685 100644
---
a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
+++
b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
@@ -21,7 +21,6 @@ package org.apache.pinot.common.utils.config;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.Lists;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@@ -268,27 +267,21 @@ public class TableConfigSerDeTest {
}
{
// With ingestion config
- List<AggregationConfig> aggregationConfigs = Lists.newArrayList(new
AggregationConfig("SUM__bar", "SUM(bar)"),
- new AggregationConfig("MIN_foo", "MIN(foo)"));
- List<TransformConfig> transformConfigs =
- Lists.newArrayList(new TransformConfig("bar", "func(moo)"), new
TransformConfig("zoo", "myfunc()"));
- Map<String, String> batchConfigMap = new HashMap<>();
- batchConfigMap.put("batchType", "s3");
- Map<String, String> streamConfigMap = new HashMap<>();
- streamConfigMap.put("streamType", "kafka");
- List<Map<String, String>> streamConfigMaps = new ArrayList<>();
- streamConfigMaps.add(streamConfigMap);
- List<Map<String, String>> batchConfigMaps = new ArrayList<>();
- batchConfigMaps.add(batchConfigMap);
- List<String> fieldsToUnnest = Arrays.asList("c1, c2");
- Map<String, String> prefixesToRename = new HashMap<>();
- IngestionConfig ingestionConfig =
- new IngestionConfig(new BatchIngestionConfig(batchConfigMaps,
"APPEND", "HOURLY"),
- new StreamIngestionConfig(streamConfigMaps), new
FilterConfig("filterFunc(foo)"), transformConfigs,
- new ComplexTypeConfig(fieldsToUnnest, ".",
ComplexTypeConfig.CollectionNotUnnestedToJson.NON_PRIMITIVE,
- prefixesToRename), aggregationConfigs);
- TableConfig tableConfig =
tableConfigBuilder.setIngestionConfig(ingestionConfig).build();
+ IngestionConfig ingestionConfig = new IngestionConfig();
+ ingestionConfig.setBatchIngestionConfig(
+ new
BatchIngestionConfig(Collections.singletonList(Collections.singletonMap("batchType",
"s3")), "APPEND",
+ "HOURLY"));
+ ingestionConfig.setStreamIngestionConfig(
+ new
StreamIngestionConfig(Collections.singletonList(Collections.singletonMap("streamType",
"kafka"))));
+ ingestionConfig.setFilterConfig(new FilterConfig("filterFunc(foo)"));
+ ingestionConfig.setTransformConfigs(
+ Arrays.asList(new TransformConfig("bar", "func(moo)"), new
TransformConfig("zoo", "myfunc()")));
+ ingestionConfig.setComplexTypeConfig(new
ComplexTypeConfig(Arrays.asList("c1", "c2"), ".",
+ ComplexTypeConfig.CollectionNotUnnestedToJson.NON_PRIMITIVE,
Collections.emptyMap()));
+ ingestionConfig.setAggregationConfigs(
+ Arrays.asList(new AggregationConfig("SUM__bar", "SUM(bar)"), new
AggregationConfig("MIN_foo", "MIN(foo)")));
+ TableConfig tableConfig =
tableConfigBuilder.setIngestionConfig(ingestionConfig).build();
checkIngestionConfig(tableConfig);
// Serialize then de-serialize
diff --git
a/pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/http/PinotConnectionUtils.java
b/pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/http/PinotConnectionUtils.java
index 500b8fed26..c6a1b43da9 100644
---
a/pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/http/PinotConnectionUtils.java
+++
b/pinot-connectors/pinot-flink-connector/src/main/java/org/apache/pinot/connector/flink/http/PinotConnectionUtils.java
@@ -18,8 +18,7 @@
*/
package org.apache.pinot.connector.flink.http;
-import com.google.common.collect.Lists;
-import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -48,7 +47,7 @@ public final class PinotConnectionUtils {
}
public static TableConfig getTableConfig(ControllerRequestClient client,
String tableName, String tableType) {
- TableConfig tableConfig = null;
+ TableConfig tableConfig;
try {
tableConfig = client.getTableConfig(tableName,
TableType.valueOf(tableType));
} catch (Exception e) {
@@ -57,37 +56,34 @@ public final class PinotConnectionUtils {
LOGGER.info("fetched pinot config {}", tableConfig);
- Map<String, String> newBatchConfigMaps = new HashMap<>();
+ Map<String, String> newBatchConfigMap = new HashMap<>();
// append the batch config of controller URI
String controllerBaseUrl =
client.getControllerRequestURLBuilder().getBaseUrl();
- newBatchConfigMaps.put("push.controllerUri", controllerBaseUrl);
- newBatchConfigMaps.put("outputDirURI", "/tmp/pinotoutput");
+ newBatchConfigMap.put("push.controllerUri", controllerBaseUrl);
+ newBatchConfigMap.put("outputDirURI", "/tmp/pinotoutput");
+
IngestionConfig ingestionConfig = tableConfig.getIngestionConfig();
if (ingestionConfig == null) {
- tableConfig.setIngestionConfig(
- new IngestionConfig(new
BatchIngestionConfig(Lists.newArrayList(newBatchConfigMaps), "APPEND",
"HOURLY"),
- null, null, null, null, null));
+ ingestionConfig = new IngestionConfig();
+ ingestionConfig.setBatchIngestionConfig(
+ new
BatchIngestionConfig(Collections.singletonList(newBatchConfigMap), "APPEND",
"HOURLY"));
+ tableConfig.setIngestionConfig(ingestionConfig);
return tableConfig;
}
- if (ingestionConfig.getBatchIngestionConfig() == null) {
- tableConfig.setIngestionConfig(
- new IngestionConfig(new
BatchIngestionConfig(Lists.newArrayList(newBatchConfigMaps), "APPEND",
"HOURLY"),
- null, ingestionConfig.getFilterConfig(),
ingestionConfig.getTransformConfigs(),
- ingestionConfig.getComplexTypeConfig(),
ingestionConfig.getAggregationConfigs()));
+
+ BatchIngestionConfig batchIngestionConfig =
ingestionConfig.getBatchIngestionConfig();
+ if (batchIngestionConfig == null) {
+ ingestionConfig.setBatchIngestionConfig(
+ new
BatchIngestionConfig(Collections.singletonList(newBatchConfigMap), "APPEND",
"HOURLY"));
return tableConfig;
}
- List<Map<String, String>> batchConfigMaps =
- ingestionConfig.getBatchIngestionConfig().getBatchConfigMaps() == null
? new ArrayList<>()
- : ingestionConfig.getBatchIngestionConfig().getBatchConfigMaps();
- batchConfigMaps.add(newBatchConfigMaps);
-
- tableConfig.setIngestionConfig(new IngestionConfig(
- new BatchIngestionConfig(batchConfigMaps,
ingestionConfig.getBatchIngestionConfig().getSegmentIngestionType(),
-
ingestionConfig.getBatchIngestionConfig().getSegmentIngestionFrequency()), null,
- ingestionConfig.getFilterConfig(),
ingestionConfig.getTransformConfigs(),
- ingestionConfig.getComplexTypeConfig(),
ingestionConfig.getAggregationConfigs()));
-
+ List<Map<String, String>> batchConfigMaps =
batchIngestionConfig.getBatchConfigMaps();
+ if (batchConfigMaps == null) {
+
batchIngestionConfig.setBatchConfigMaps(Collections.singletonList(newBatchConfigMap));
+ } else {
+ batchConfigMaps.add(newBatchConfigMap);
+ }
return tableConfig;
}
}
diff --git
a/pinot-connectors/pinot-flink-connector/src/test/java/org/apache/pinot/connector/flink/sink/PinotSinkIntegrationTest.java
b/pinot-connectors/pinot-flink-connector/src/test/java/org/apache/pinot/connector/flink/sink/PinotSinkIntegrationTest.java
index a9a9086716..d48dc6016c 100644
---
a/pinot-connectors/pinot-flink-connector/src/test/java/org/apache/pinot/connector/flink/sink/PinotSinkIntegrationTest.java
+++
b/pinot-connectors/pinot-flink-connector/src/test/java/org/apache/pinot/connector/flink/sink/PinotSinkIntegrationTest.java
@@ -79,9 +79,9 @@ public class PinotSinkIntegrationTest extends
BaseClusterIntegrationTest {
batchConfigs.put(BatchConfigProperties.OUTPUT_DIR_URI,
_tarDir.getAbsolutePath());
batchConfigs.put(BatchConfigProperties.OVERWRITE_OUTPUT, "false");
batchConfigs.put(BatchConfigProperties.PUSH_CONTROLLER_URI,
_controllerBaseApiUrl);
- IngestionConfig ingestionConfig =
- new IngestionConfig(new
BatchIngestionConfig(Collections.singletonList(batchConfigs), "APPEND",
"HOURLY"), null,
- null, null, null, null);
+ IngestionConfig ingestionConfig = new IngestionConfig();
+ ingestionConfig.setBatchIngestionConfig(
+ new BatchIngestionConfig(Collections.singletonList(batchConfigs),
"APPEND", "HOURLY"));
_tableConfig =
new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setIngestionConfig(ingestionConfig)
.build();
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionHelper.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionHelper.java
index 1b1238c4a2..380b62c55b 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionHelper.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/FileIngestionHelper.java
@@ -19,13 +19,13 @@
package org.apache.pinot.controller.util;
import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.io.FileUtils;
@@ -129,7 +129,7 @@ public class FileIngestionHelper {
batchConfigMapOverride.put(segmentNamePostfixProp,
String.valueOf(System.currentTimeMillis()));
}
BatchIngestionConfig batchIngestionConfigOverride =
- new BatchIngestionConfig(Lists.newArrayList(batchConfigMapOverride),
+ new
BatchIngestionConfig(Collections.singletonList(batchConfigMapOverride),
IngestionConfigUtils.getBatchSegmentIngestionType(_tableConfig),
IngestionConfigUtils.getBatchSegmentIngestionFrequency(_tableConfig));
@@ -147,8 +147,8 @@ public class FileIngestionHelper {
TarGzCompressionUtils.createTarGzFile(new File(outputDir, segmentName),
segmentTarFile);
// Upload segment
- IngestionConfig ingestionConfigOverride =
- new IngestionConfig(batchIngestionConfigOverride, null, null, null,
null, null);
+ IngestionConfig ingestionConfigOverride = new IngestionConfig();
+
ingestionConfigOverride.setBatchIngestionConfig(batchIngestionConfigOverride);
TableConfig tableConfigOverride =
new
TableConfigBuilder(_tableConfig.getTableType()).setTableName(_tableConfig.getTableName())
.setIngestionConfig(ingestionConfigOverride).build();
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
index b167a18256..12fad8d66a 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java
@@ -786,12 +786,12 @@ public class PinotHelixResourceManagerTest {
throws IOException, InterruptedException {
// Create the table
+ IngestionConfig ingestionConfig = new IngestionConfig();
+ ingestionConfig.setBatchIngestionConfig(new BatchIngestionConfig(null,
"REFRESH", "DAILY"));
TableConfig tableConfig =
new
TableConfigBuilder(TableType.OFFLINE).setTableName(OFFLINE_SEGMENTS_REPLACE_TEST_REFRESH_TABLE_NAME)
.setNumReplicas(2).setBrokerTenant(BROKER_TENANT_NAME).setServerTenant(SERVER_TENANT_NAME)
- .setIngestionConfig(
- new IngestionConfig(new BatchIngestionConfig(null, "REFRESH",
"DAILY"), null, null, null, null, null))
- .build();
+ .setIngestionConfig(ingestionConfig).build();
TEST_INSTANCE.getHelixResourceManager().addTable(tableConfig);
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/SegmentLineageCleanupTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/SegmentLineageCleanupTest.java
index 16e70e91eb..66246b9c62 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/SegmentLineageCleanupTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/SegmentLineageCleanupTest.java
@@ -100,8 +100,8 @@ public class SegmentLineageCleanupTest {
.setRetentionTimeValue(RETENTION_TIME_VALUE).build();
TEST_INSTANCE.getHelixResourceManager().addTable(tableConfig);
- IngestionConfig ingestionConfig =
- new IngestionConfig(new BatchIngestionConfig(null, "REFRESH",
"DAILY"), null, null, null, null, null);
+ IngestionConfig ingestionConfig = new IngestionConfig();
+ ingestionConfig.setBatchIngestionConfig(new BatchIngestionConfig(null,
"REFRESH", "DAILY"));
TableConfig refreshTableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(REFRESH_OFFLINE_TABLE_NAME)
.setBrokerTenant(BROKER_TENANT_NAME).setServerTenant(SERVER_TENANT_NAME).setNumReplicas(1)
.setRetentionTimeUnit(RETENTION_TIME_UNIT).setRetentionTimeValue(RETENTION_TIME_VALUE)
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/util/SchemaUtilsTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/util/SchemaUtilsTest.java
index af3d226708..85a834e9da 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/util/SchemaUtilsTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/util/SchemaUtilsTest.java
@@ -21,6 +21,7 @@ package org.apache.pinot.core.util;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.segment.local.utils.SchemaUtils;
@@ -91,9 +92,10 @@ public class SchemaUtilsTest {
// schema doesn't have destination columns from transformConfigs
schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).build();
- tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig(
- new IngestionConfig(null, null, null, Lists.newArrayList(new
TransformConfig("colA", "round(colB, 1000)")),
- null, null)).build();
+ IngestionConfig ingestionConfig = new IngestionConfig();
+ ingestionConfig.setTransformConfigs(Collections.singletonList(new
TransformConfig("colA", "round(colB, 1000)")));
+ tableConfig =
+ new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig(ingestionConfig).build();
try {
SchemaUtils.validate(schema, Lists.newArrayList(tableConfig));
Assert.fail("Should fail schema validation, as colA is not present in
schema");
@@ -140,8 +142,7 @@ public class SchemaUtilsTest {
schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
.addDateTime(TIME_COLUMN, DataType.LONG, "1:MILLISECONDS:EPOCH",
"1:HOURS").build();
tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName(TIME_COLUMN)
- .setIngestionConfig(new IngestionConfig(null, null, null,
- Lists.newArrayList(new TransformConfig("colA", "round(colB,
1000)")), null, null)).build();
+ .setIngestionConfig(ingestionConfig).build();
try {
SchemaUtils.validate(schema, Lists.newArrayList(tableConfig));
Assert.fail("Should fail schema validation, as colA is not present in
schema");
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/IngestionConfigHybridIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/IngestionConfigHybridIntegrationTest.java
index 9fdd12e940..4f8a752226 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/IngestionConfigHybridIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/IngestionConfigHybridIntegrationTest.java
@@ -20,7 +20,8 @@ package org.apache.pinot.integration.tests;
import com.fasterxml.jackson.databind.JsonNode;
import java.io.File;
-import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.pinot.spi.config.table.TableConfig;
@@ -60,17 +61,18 @@ public class IngestionConfigHybridIntegrationTest extends
BaseClusterIntegration
@Override
protected IngestionConfig getIngestionConfig() {
+ IngestionConfig ingestionConfig = new IngestionConfig();
+ ingestionConfig.setStreamIngestionConfig(
+ new
StreamIngestionConfig(Collections.singletonList(getStreamConfigMap())));
FilterConfig filterConfig =
new FilterConfig("Groovy({AirlineID == 19393 || ArrDelayMinutes <= 5
}, AirlineID, ArrDelayMinutes)");
- List<TransformConfig> transformConfigs = new ArrayList<>();
- transformConfigs.add(new TransformConfig("AmPm", "Groovy({DepTime < 1200 ?
\"AM\": \"PM\"}, DepTime)"));
- transformConfigs.add(new TransformConfig("millisSinceEpoch",
"fromEpochDays(DaysSinceEpoch)"));
- transformConfigs.add(new TransformConfig("lowerCaseDestCityName",
"lower(DestCityName)"));
-
- List<Map<String, String>> streamConfigMaps = new ArrayList<>();
- streamConfigMaps.add(getStreamConfigMap());
- return new IngestionConfig(null, new
StreamIngestionConfig(streamConfigMaps), filterConfig, transformConfigs, null,
- null);
+ ingestionConfig.setFilterConfig(filterConfig);
+ List<TransformConfig> transformConfigs = Arrays.asList(
+ new TransformConfig("AmPm", "Groovy({DepTime < 1200 ? \"AM\": \"PM\"},
DepTime)"),
+ new TransformConfig("millisSinceEpoch",
"fromEpochDays(DaysSinceEpoch)"),
+ new TransformConfig("lowerCaseDestCityName", "lower(DestCityName)"));
+ ingestionConfig.setTransformConfigs(transformConfigs);
+ return ingestionConfig;
}
@Override
@@ -139,8 +141,8 @@ public class IngestionConfigHybridIntegrationTest extends
BaseClusterIntegration
addTableConfig(createRealtimeTableConfig(realtimeAvroFiles.get(0)));
// Create and upload segments
- ClusterIntegrationTestUtils
- .buildSegmentsFromAvro(offlineAvroFiles, offlineTableConfig, schema,
0, _segmentDir, _tarDir);
+ ClusterIntegrationTestUtils.buildSegmentsFromAvro(offlineAvroFiles,
offlineTableConfig, schema, 0, _segmentDir,
+ _tarDir);
uploadSegments(getTableName(), _tarDir);
// Push data into Kafka
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/JsonPathClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/JsonPathClusterIntegrationTest.java
index 065cca63de..31a2bedf9b 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/JsonPathClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/JsonPathClusterIntegrationTest.java
@@ -21,7 +21,6 @@ package org.apache.pinot.integration.tests;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
import com.jayway.jsonpath.spi.cache.Cache;
import com.jayway.jsonpath.spi.cache.CacheProvider;
import java.io.File;
@@ -89,13 +88,16 @@ public class JsonPathClusterIntegrationTest extends
BaseClusterIntegrationTest {
.addSingleValueDimension(COMPLEX_MAP_STR_FIELD_NAME,
DataType.STRING)
.addMultiValueDimension(COMPLEX_MAP_STR_K3_FIELD_NAME,
DataType.STRING).build();
addSchema(schema);
- List<TransformConfig> transformConfigs = Lists.newArrayList(
+ List<TransformConfig> transformConfigs = Arrays.asList(
new TransformConfig(MY_MAP_STR_K1_FIELD_NAME, "jsonPathString(" +
MY_MAP_STR_FIELD_NAME + ", '$.k1')"),
new TransformConfig(MY_MAP_STR_K2_FIELD_NAME, "jsonPathString(" +
MY_MAP_STR_FIELD_NAME + ", '$.k2')"),
new TransformConfig(COMPLEX_MAP_STR_K3_FIELD_NAME,
"jsonPathArray(" + COMPLEX_MAP_STR_FIELD_NAME + ", '$.k3')"));
- TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(rawTableName)
- .setIngestionConfig(new IngestionConfig(null, null, null,
transformConfigs, null, null)).build();
+ IngestionConfig ingestionConfig = new IngestionConfig();
+ ingestionConfig.setTransformConfigs(transformConfigs);
+ TableConfig tableConfig =
+ new
TableConfigBuilder(TableType.OFFLINE).setTableName(rawTableName).setIngestionConfig(ingestionConfig)
+ .build();
addTableConfig(tableConfig);
// Create and upload segments
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MapTypeClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MapTypeClusterIntegrationTest.java
index 726572a7ca..68ec52dc5d 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MapTypeClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MapTypeClusterIntegrationTest.java
@@ -19,7 +19,6 @@
package org.apache.pinot.integration.tests;
import com.fasterxml.jackson.databind.JsonNode;
-import com.google.common.collect.Lists;
import java.io.File;
import java.util.Arrays;
import java.util.HashMap;
@@ -80,11 +79,14 @@ public class MapTypeClusterIntegrationTest extends
BaseClusterIntegrationTest {
.addMultiValueDimension(INT_KEY_MAP_FIELD_NAME +
SchemaUtils.MAP_VALUE_COLUMN_SUFFIX, DataType.INT)
.addSingleValueDimension(STRING_KEY_MAP_STR_FIELD_NAME,
DataType.STRING)
.addSingleValueDimension(INT_KEY_MAP_STR_FIELD_NAME,
DataType.STRING).build();
- List<TransformConfig> transformConfigs = Lists.newArrayList(
+ List<TransformConfig> transformConfigs = Arrays.asList(
new TransformConfig(STRING_KEY_MAP_STR_FIELD_NAME, "toJsonMapStr(" +
STRING_KEY_MAP_FIELD_NAME + ")"),
new TransformConfig(INT_KEY_MAP_STR_FIELD_NAME, "toJsonMapStr(" +
INT_KEY_MAP_FIELD_NAME + ")"));
- TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(rawTableName)
- .setIngestionConfig(new IngestionConfig(null, null, null,
transformConfigs, null, null)).build();
+ IngestionConfig ingestionConfig = new IngestionConfig();
+ ingestionConfig.setTransformConfigs(transformConfigs);
+ TableConfig tableConfig =
+ new
TableConfigBuilder(TableType.OFFLINE).setTableName(rawTableName).setIngestionConfig(ingestionConfig)
+ .build();
addTableConfig(tableConfig);
// Create and upload segments
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index 4dbaa85846..779f7780c4 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -967,10 +967,13 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
_schemaFileName = SCHEMA_FILE_NAME_WITH_EXTRA_COLUMNS;
addSchema(createSchema());
TableConfig tableConfig = getOfflineTableConfig();
- tableConfig.setIngestionConfig(new IngestionConfig(null, null, null,
- Arrays.asList(new TransformConfig("NewAddedDerivedHoursSinceEpoch",
"times(DaysSinceEpoch, 24)"),
- new TransformConfig("NewAddedDerivedSecondsSinceEpoch",
"times(times(DaysSinceEpoch, 24), 3600)"),
- new TransformConfig("NewAddedDerivedMVStringDimension",
"split(DestCityName, ', ')")), null, null));
+ List<TransformConfig> transformConfigs = Arrays.asList(
+ new TransformConfig("NewAddedDerivedHoursSinceEpoch",
"times(DaysSinceEpoch, 24)"),
+ new TransformConfig("NewAddedDerivedSecondsSinceEpoch",
"times(times(DaysSinceEpoch, 24), 3600)"),
+ new TransformConfig("NewAddedDerivedMVStringDimension",
"split(DestCityName, ', ')"));
+ IngestionConfig ingestionConfig = new IngestionConfig();
+ ingestionConfig.setTransformConfigs(transformConfigs);
+ tableConfig.setIngestionConfig(ingestionConfig);
updateTableConfig(tableConfig);
// Trigger reload
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentWriterUploaderIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentWriterUploaderIntegrationTest.java
index fa67d8c62a..bfdef722fd 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentWriterUploaderIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentWriterUploaderIntegrationTest.java
@@ -20,10 +20,10 @@ package org.apache.pinot.integration.tests;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Function;
-import com.google.common.collect.Lists;
import java.io.File;
import java.io.IOException;
import java.net.URI;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -89,8 +89,10 @@ public class SegmentWriterUploaderIntegrationTest extends
BaseClusterIntegration
batchConfigMap.put(BatchConfigProperties.OUTPUT_DIR_URI,
_tarDir.getAbsolutePath());
batchConfigMap.put(BatchConfigProperties.OVERWRITE_OUTPUT, "false");
batchConfigMap.put(BatchConfigProperties.PUSH_CONTROLLER_URI,
_controllerBaseApiUrl);
- return new IngestionConfig(new
BatchIngestionConfig(Lists.newArrayList(batchConfigMap), "APPEND", "HOURLY"),
null,
- null, null, null, null);
+ IngestionConfig ingestionConfig = new IngestionConfig();
+ ingestionConfig.setBatchIngestionConfig(
+ new BatchIngestionConfig(Collections.singletonList(batchConfigMap),
"APPEND", "HOURLY"));
+ return ingestionConfig;
}
/**
diff --git
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperTest.java
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperTest.java
index 9cbb840997..db2e811c93 100644
---
a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperTest.java
+++
b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/test/java/org/apache/pinot/hadoop/job/preprocess/DataPreprocessingHelperTest.java
@@ -54,8 +54,8 @@ public class DataPreprocessingHelperTest {
Path outputPath = new Path("mockOutputPath");
DataPreprocessingHelper dataPreprocessingHelper = new
AvroDataPreprocessingHelper(inputPaths, outputPath);
- BatchIngestionConfig batchIngestionConfig = new BatchIngestionConfig(null,
"APPEND", "DAILY");
- IngestionConfig ingestionConfig = new
IngestionConfig(batchIngestionConfig, null, null, null, null, null);
+ IngestionConfig ingestionConfig = new IngestionConfig();
+ ingestionConfig.setBatchIngestionConfig(new BatchIngestionConfig(null,
"APPEND", "DAILY"));
TableConfig tableConfig =
new
TableConfigBuilder(TableType.OFFLINE).setTableName("testTableName").setIngestionConfig(ingestionConfig)
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGeneratorTest.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGeneratorTest.java
index 1fbfcc59d1..5830f76e86 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGeneratorTest.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGeneratorTest.java
@@ -96,8 +96,8 @@ public class MergeRollupTaskGeneratorTest {
assertTrue(pinotTaskConfigs.isEmpty());
// Skip task generation, if REFRESH table
- IngestionConfig ingestionConfig =
- new IngestionConfig(new BatchIngestionConfig(null, "REFRESH", null),
null, null, null, null, null);
+ IngestionConfig ingestionConfig = new IngestionConfig();
+ ingestionConfig.setBatchIngestionConfig(new BatchIngestionConfig(null,
"REFRESH", null));
offlineTableConfig = getOfflineTableConfig(new HashMap<>());
offlineTableConfig.setIngestionConfig(ingestionConfig);
pinotTaskConfigs =
generator.generateTasks(Lists.newArrayList(offlineTableConfig));
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutorTest.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutorTest.java
index be3118a51b..f2a197c892 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutorTest.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskExecutorTest.java
@@ -21,6 +21,7 @@ package
org.apache.pinot.plugin.minion.tasks.realtimetoofflinesegments;
import com.google.common.collect.Lists;
import java.io.File;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -98,14 +99,18 @@ public class RealtimeToOfflineSegmentsTaskExecutorTest {
TableConfig tableConfigWithSortedCol =
new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME_WITH_SORTED_COL).setTimeColumnName(T)
.setSortedColumn(D1).build();
+ IngestionConfig ingestionConfigEpochHours = new IngestionConfig();
+ ingestionConfigEpochHours.setTransformConfigs(
+ Collections.singletonList(new TransformConfig(T_TRX,
"toEpochHours(t)")));
TableConfig tableConfigEpochHours =
new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME_EPOCH_HOURS).setTimeColumnName(T_TRX)
- .setSortedColumn(D1).setIngestionConfig(new IngestionConfig(null,
null, null,
- Lists.newArrayList(new TransformConfig(T_TRX,
"toEpochHours(t)")), null, null)).build();
+
.setSortedColumn(D1).setIngestionConfig(ingestionConfigEpochHours).build();
+ IngestionConfig ingestionConfigSDF = new IngestionConfig();
+ ingestionConfigSDF.setTransformConfigs(
+ Collections.singletonList(new TransformConfig(T_TRX, "toDateTime(t,
'yyyyMMddHH')")));
TableConfig tableConfigSDF =
new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME_SDF).setTimeColumnName(T_TRX)
- .setSortedColumn(D1).setIngestionConfig(new IngestionConfig(null,
null, null,
- Lists.newArrayList(new TransformConfig(T_TRX, "toDateTime(t,
'yyyyMMddHH')")), null, null)).build();
+
.setSortedColumn(D1).setIngestionConfig(ingestionConfigSDF).build();
Schema schema =
new
Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension(D1,
FieldSpec.DataType.STRING)
.addMetric(M1, FieldSpec.DataType.INT)
diff --git
a/pinot-plugins/pinot-segment-writer/pinot-segment-writer-file-based/src/test/java/org/apache/pinot/plugin/segmentwriter/filebased/FileBasedSegmentWriterTest.java
b/pinot-plugins/pinot-segment-writer/pinot-segment-writer-file-based/src/test/java/org/apache/pinot/plugin/segmentwriter/filebased/FileBasedSegmentWriterTest.java
index f48eb2195a..a6e4b9c32f 100644
---
a/pinot-plugins/pinot-segment-writer/pinot-segment-writer-file-based/src/test/java/org/apache/pinot/plugin/segmentwriter/filebased/FileBasedSegmentWriterTest.java
+++
b/pinot-plugins/pinot-segment-writer/pinot-segment-writer-file-based/src/test/java/org/apache/pinot/plugin/segmentwriter/filebased/FileBasedSegmentWriterTest.java
@@ -23,6 +23,7 @@ import com.google.common.collect.Lists;
import java.io.File;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -69,14 +70,12 @@ public class FileBasedSegmentWriterTest {
Preconditions.checkState(_tmpDir.mkdirs());
_outputDir = new File(_tmpDir, "segmentWriterOutputDir");
- List<TransformConfig> transformConfigs = new ArrayList<>();
- transformConfigs.add(new TransformConfig("aSimpleMap_str",
"jsonFormat(aSimpleMap)"));
- transformConfigs.add(new TransformConfig("anAdvancedMap_str",
"jsonFormat(anAdvancedMap)"));
- Map<String, String> batchConfigMap = new HashMap<>();
- batchConfigMap.put(BatchConfigProperties.OUTPUT_DIR_URI,
_outputDir.getAbsolutePath());
- _ingestionConfig =
- new IngestionConfig(new
BatchIngestionConfig(Lists.newArrayList(batchConfigMap), "APPEND", "HOURLY"),
null,
- null, transformConfigs, null, null);
+ _ingestionConfig = new IngestionConfig();
+ _ingestionConfig.setBatchIngestionConfig(new
BatchIngestionConfig(Collections.singletonList(
+ Collections.singletonMap(BatchConfigProperties.OUTPUT_DIR_URI,
_outputDir.getAbsolutePath())), "APPEND",
+ "HOURLY"));
+ _ingestionConfig.setTransformConfigs(Arrays.asList(new
TransformConfig("aSimpleMap_str", "jsonFormat(aSimpleMap)"),
+ new TransformConfig("anAdvancedMap_str",
"jsonFormat(anAdvancedMap)")));
_tableConfig =
new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig(_ingestionConfig)
.setTimeColumnName(TIME_COLUMN_NAME).build();
@@ -101,7 +100,6 @@ public class FileBasedSegmentWriterTest {
@Test
public void testBatchConfigs()
throws Exception {
-
SegmentWriter segmentWriter = new FileBasedSegmentWriter();
TableConfig tableConfig =
new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTimeColumnName(TIME_COLUMN_NAME).build();
@@ -112,7 +110,8 @@ public class FileBasedSegmentWriterTest {
// expected
}
- tableConfig.setIngestionConfig(new IngestionConfig(null, null, null, null,
null, null));
+ IngestionConfig ingestionConfig = new IngestionConfig();
+ tableConfig.setIngestionConfig(ingestionConfig);
try {
segmentWriter.init(tableConfig, _schema);
Assert.fail("Should fail due to missing batchIngestionConfig");
@@ -120,8 +119,7 @@ public class FileBasedSegmentWriterTest {
// expected
}
- tableConfig.setIngestionConfig(
- new IngestionConfig(new BatchIngestionConfig(null, "APPEND",
"HOURLY"), null, null, null, null, null));
+ ingestionConfig.setBatchIngestionConfig(new BatchIngestionConfig(null,
"APPEND", "HOURLY"));
try {
segmentWriter.init(tableConfig, _schema);
Assert.fail("Should fail due to missing batchConfigMaps");
@@ -129,9 +127,7 @@ public class FileBasedSegmentWriterTest {
// expected
}
- tableConfig.setIngestionConfig(
- new IngestionConfig(new BatchIngestionConfig(Collections.emptyList(),
"APPEND", "HOURLY"), null, null, null,
- null, null));
+ ingestionConfig.setBatchIngestionConfig(new
BatchIngestionConfig(Collections.emptyList(), "APPEND", "HOURLY"));
try {
segmentWriter.init(tableConfig, _schema);
Assert.fail("Should fail due to missing batchConfigMaps");
@@ -139,9 +135,8 @@ public class FileBasedSegmentWriterTest {
// expected
}
- tableConfig.setIngestionConfig(
- new IngestionConfig(new
BatchIngestionConfig(Lists.newArrayList(Collections.emptyMap()), "APPEND",
"HOURLY"),
- null, null, null, null, null));
+ ingestionConfig.setBatchIngestionConfig(
+ new
BatchIngestionConfig(Collections.singletonList(Collections.emptyMap()),
"APPEND", "HOURLY"));
try {
segmentWriter.init(tableConfig, _schema);
Assert.fail("Should fail due to missing outputDirURI in batchConfigMap");
@@ -149,11 +144,9 @@ public class FileBasedSegmentWriterTest {
// expected
}
- Map<String, String> batchConfigMap = new HashMap<>();
- batchConfigMap.put(BatchConfigProperties.OUTPUT_DIR_URI,
_outputDir.getAbsolutePath());
- tableConfig.setIngestionConfig(
- new IngestionConfig(new
BatchIngestionConfig(Lists.newArrayList(batchConfigMap), "APPEND", "HOURLY"),
null,
- null, null, null, null));
+ ingestionConfig.setBatchIngestionConfig(new
BatchIngestionConfig(Collections.singletonList(
+ Collections.singletonMap(BatchConfigProperties.OUTPUT_DIR_URI,
_outputDir.getAbsolutePath())), "APPEND",
+ "HOURLY"));
segmentWriter.init(tableConfig, _schema);
segmentWriter.close();
}
@@ -242,20 +235,19 @@ public class FileBasedSegmentWriterTest {
FileUtils.deleteQuietly(_outputDir);
// FIXED segment name
- Map<String, String> batchConfigMap =
_ingestionConfig.getBatchIngestionConfig().getBatchConfigMaps().get(0);
- Map<String, String> batchConfigMapOverride = new HashMap<>(batchConfigMap);
- batchConfigMapOverride
- .put(BatchConfigProperties.SEGMENT_NAME_GENERATOR_TYPE,
BatchConfigProperties.SegmentNameGeneratorType.FIXED);
- batchConfigMapOverride.put(String
- .format("%s.%s",
BatchConfigProperties.SEGMENT_NAME_GENERATOR_PROP_PREFIX,
- BatchConfigProperties.SEGMENT_NAME),
- "customSegmentName");
+ Map<String, String> batchConfigMap = new HashMap<>();
+ batchConfigMap.put(BatchConfigProperties.OUTPUT_DIR_URI,
_outputDir.getAbsolutePath());
+ batchConfigMap.put(BatchConfigProperties.SEGMENT_NAME_GENERATOR_TYPE,
+ BatchConfigProperties.SegmentNameGeneratorType.FIXED);
+ batchConfigMap.put(String.format("%s.%s",
BatchConfigProperties.SEGMENT_NAME_GENERATOR_PROP_PREFIX,
+ BatchConfigProperties.SEGMENT_NAME), "customSegmentName");
+ IngestionConfig ingestionConfig = new IngestionConfig();
+ ingestionConfig.setBatchIngestionConfig(
+ new BatchIngestionConfig(Collections.singletonList(batchConfigMap),
"APPEND", "HOURLY"));
+
ingestionConfig.setTransformConfigs(_ingestionConfig.getTransformConfigs());
TableConfig tableConfig =
new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTimeColumnName(TIME_COLUMN_NAME)
- .setIngestionConfig(new IngestionConfig(new
BatchIngestionConfig(Lists.newArrayList(batchConfigMapOverride),
-
_ingestionConfig.getBatchIngestionConfig().getSegmentIngestionType(),
-
_ingestionConfig.getBatchIngestionConfig().getSegmentIngestionFrequency()),
null, null,
- _ingestionConfig.getTransformConfigs(), null, null)).build();
+ .setIngestionConfig(ingestionConfig).build();
SegmentWriter segmentWriter = new FileBasedSegmentWriter();
segmentWriter.init(tableConfig, _schema);
@@ -274,14 +266,8 @@ public class FileBasedSegmentWriterTest {
segmentWriter.close();
// NORMALIZED segment name
- batchConfigMapOverride = new HashMap<>(batchConfigMap);
-
batchConfigMapOverride.put(BatchConfigProperties.SEGMENT_NAME_GENERATOR_TYPE,
+ batchConfigMap.put(BatchConfigProperties.SEGMENT_NAME_GENERATOR_TYPE,
BatchConfigProperties.SegmentNameGeneratorType.NORMALIZED_DATE);
- tableConfig.setIngestionConfig(new IngestionConfig(
- new BatchIngestionConfig(Lists.newArrayList(batchConfigMapOverride),
-
_ingestionConfig.getBatchIngestionConfig().getSegmentIngestionType(),
-
_ingestionConfig.getBatchIngestionConfig().getSegmentIngestionFrequency()),
null, null,
- _ingestionConfig.getTransformConfigs(), null, null));
segmentWriter.init(tableConfig, _schema);
// write 2 records
@@ -297,15 +283,9 @@ public class FileBasedSegmentWriterTest {
FileUtils.deleteQuietly(_outputDir);
// SIMPLE segment name w/ sequenceId
- batchConfigMapOverride = new HashMap<>(batchConfigMap);
- batchConfigMapOverride
- .put(BatchConfigProperties.SEGMENT_NAME_GENERATOR_TYPE,
BatchConfigProperties.SegmentNameGeneratorType.SIMPLE);
- batchConfigMapOverride.put(BatchConfigProperties.SEQUENCE_ID, "1001");
- tableConfig.setIngestionConfig(new IngestionConfig(
- new BatchIngestionConfig(Lists.newArrayList(batchConfigMapOverride),
-
_ingestionConfig.getBatchIngestionConfig().getSegmentIngestionType(),
-
_ingestionConfig.getBatchIngestionConfig().getSegmentIngestionFrequency()),
null, null,
- _ingestionConfig.getTransformConfigs(), null, null));
+ batchConfigMap.put(BatchConfigProperties.SEGMENT_NAME_GENERATOR_TYPE,
+ BatchConfigProperties.SegmentNameGeneratorType.SIMPLE);
+ batchConfigMap.put(BatchConfigProperties.SEQUENCE_ID, "1001");
segmentWriter.init(tableConfig, _schema);
// write 2 records
@@ -329,16 +309,18 @@ public class FileBasedSegmentWriterTest {
throws Exception {
FileUtils.deleteQuietly(_outputDir);
- SegmentWriter segmentWriter = new FileBasedSegmentWriter();
- Map<String, String> batchConfigMap =
_ingestionConfig.getBatchIngestionConfig().getBatchConfigMaps().get(0);
- Map<String, String> batchConfigMapOverride = new HashMap<>(batchConfigMap);
- batchConfigMapOverride.put(BatchConfigProperties.OVERWRITE_OUTPUT, "true");
+ Map<String, String> batchConfigMap = new HashMap<>();
+ batchConfigMap.put(BatchConfigProperties.OUTPUT_DIR_URI,
_outputDir.getAbsolutePath());
+ batchConfigMap.put(BatchConfigProperties.OVERWRITE_OUTPUT, "true");
+ IngestionConfig ingestionConfig = new IngestionConfig();
+ ingestionConfig.setBatchIngestionConfig(
+ new BatchIngestionConfig(Collections.singletonList(batchConfigMap),
"APPEND", "HOURLY"));
+
ingestionConfig.setTransformConfigs(_ingestionConfig.getTransformConfigs());
TableConfig tableConfig =
new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTimeColumnName(TIME_COLUMN_NAME)
- .setIngestionConfig(new IngestionConfig(new
BatchIngestionConfig(Lists.newArrayList(batchConfigMapOverride),
-
_ingestionConfig.getBatchIngestionConfig().getSegmentIngestionType(),
-
_ingestionConfig.getBatchIngestionConfig().getSegmentIngestionFrequency()),
null, null,
- _ingestionConfig.getTransformConfigs(), null, null)).build();
+ .setIngestionConfig(ingestionConfig).build();
+
+ SegmentWriter segmentWriter = new FileBasedSegmentWriter();
segmentWriter.init(tableConfig, _schema);
// write 3 records with same timestamp
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformerTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformerTest.java
index 3415a46995..1c37779339 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformerTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformerTest.java
@@ -20,6 +20,7 @@ package org.apache.pinot.segment.local.recordtransformer;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -53,15 +54,17 @@ public class ExpressionTransformerTest {
.addMultiValueDimension("map2_values",
FieldSpec.DataType.INT).addMetric("cost", FieldSpec.DataType.DOUBLE)
.addDateTime("hoursSinceEpoch", FieldSpec.DataType.LONG,
"1:HOURS:EPOCH", "1:HOURS").build();
- List<TransformConfig> transformConfigs = new ArrayList<>();
- transformConfigs.add(new TransformConfig("userId", "Groovy({user_id},
user_id)"));
- transformConfigs.add(new TransformConfig("fullName", "Groovy({firstName+'
'+lastName}, firstName, lastName)"));
- transformConfigs.add(new TransformConfig("maxBid", "Groovy({bids.max{
it.toBigDecimal() }}, bids)"));
- transformConfigs.add(new TransformConfig("map2_keys",
"Groovy({map2.sort()*.key}, map2)"));
- transformConfigs.add(new TransformConfig("map2_values",
"Groovy({map2.sort()*.value}, map2)"));
- transformConfigs.add(new TransformConfig("hoursSinceEpoch",
"Groovy({timestamp/(1000*60*60)}, timestamp)"));
+ List<TransformConfig> transformConfigs = Arrays.asList(
+ new TransformConfig("userId", "Groovy({user_id}, user_id)"),
+ new TransformConfig("fullName", "Groovy({firstName+' '+lastName},
firstName, lastName)"),
+ new TransformConfig("maxBid", "Groovy({bids.max{ it.toBigDecimal() }},
bids)"),
+ new TransformConfig("map2_keys", "Groovy({map2.sort()*.key}, map2)"),
+ new TransformConfig("map2_values", "Groovy({map2.sort()*.value},
map2)"),
+ new TransformConfig("hoursSinceEpoch",
"Groovy({timestamp/(1000*60*60)}, timestamp)"));
+ IngestionConfig ingestionConfig = new IngestionConfig();
+ ingestionConfig.setTransformConfigs(transformConfigs);
TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName("testTransformFunctions")
- .setIngestionConfig(new IngestionConfig(null, null, null,
transformConfigs, null, null)).build();
+ .setIngestionConfig(ingestionConfig).build();
ExpressionTransformer expressionTransformer = new
ExpressionTransformer(tableConfig, pinotSchema);
DataTypeTransformer dataTypeTransformer = new
DataTypeTransformer(pinotSchema);
@@ -145,12 +148,14 @@ public class ExpressionTransformerTest {
// also specified in table config, ignore the schema setting
pinotSchema.getFieldSpecFor("hoursSinceEpoch").setTransformFunction("Groovy({timestamp/(1000)},
timestamp)");
- List<TransformConfig> transformConfigs = new ArrayList<>();
- transformConfigs.add(new TransformConfig("userId", "Groovy({user_id},
user_id)"));
- transformConfigs.add(new TransformConfig("fullName", "Groovy({firstName+'
'+lastName}, firstName, lastName)"));
- transformConfigs.add(new TransformConfig("hoursSinceEpoch",
"Groovy({timestamp/(1000*60*60)}, timestamp)"));
+ List<TransformConfig> transformConfigs = Arrays.asList(
+ new TransformConfig("userId", "Groovy({user_id}, user_id)"),
+ new TransformConfig("fullName", "Groovy({firstName+' '+lastName},
firstName, lastName)"),
+ new TransformConfig("hoursSinceEpoch",
"Groovy({timestamp/(1000*60*60)}, timestamp)"));
+ IngestionConfig ingestionConfig = new IngestionConfig();
+ ingestionConfig.setTransformConfigs(transformConfigs);
TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName("testTransformFunctions")
- .setIngestionConfig(new IngestionConfig(null, null, null,
transformConfigs, null, null)).build();
+ .setIngestionConfig(ingestionConfig).build();
ExpressionTransformer expressionTransformer = new
ExpressionTransformer(tableConfig, pinotSchema);
@@ -201,10 +206,13 @@ public class ExpressionTransformerTest {
Schema pinotSchema = new Schema();
DimensionFieldSpec dimensionFieldSpec = new DimensionFieldSpec("fullName",
FieldSpec.DataType.STRING, true);
pinotSchema.addField(dimensionFieldSpec);
- List<TransformConfig> transformConfigs = new ArrayList<>();
- transformConfigs.add(new TransformConfig("fullName", "Groovy({firstName +
' ' + lastName}, firstName, lastName)"));
- TableConfig tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName("testValueExists")
- .setIngestionConfig(new IngestionConfig(null, null, null,
transformConfigs, null, null)).build();
+ List<TransformConfig> transformConfigs = Collections.singletonList(
+ new TransformConfig("fullName", "Groovy({firstName + ' ' + lastName},
firstName, lastName)"));
+ IngestionConfig ingestionConfig = new IngestionConfig();
+ ingestionConfig.setTransformConfigs(transformConfigs);
+ TableConfig tableConfig =
+ new
TableConfigBuilder(TableType.REALTIME).setTableName("testValueExists").setIngestionConfig(ingestionConfig)
+ .build();
ExpressionTransformer expressionTransformer = new
ExpressionTransformer(tableConfig, pinotSchema);
GenericRow genericRow = new GenericRow();
@@ -220,7 +228,7 @@ public class ExpressionTransformerTest {
.addTime(new TimeGranularitySpec(FieldSpec.DataType.LONG,
TimeUnit.MILLISECONDS, "incoming"),
new TimeGranularitySpec(FieldSpec.DataType.INT, TimeUnit.DAYS,
"outgoing")).build();
tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName("testValueExists")
- .setIngestionConfig(new IngestionConfig(null, null, null, null, null,
null)).build();
+ .setIngestionConfig(new IngestionConfig()).build();
expressionTransformer = new ExpressionTransformer(tableConfig,
pinotSchema);
genericRow = new GenericRow();
@@ -238,13 +246,14 @@ public class ExpressionTransformerTest {
.addSingleValueDimension("b",
FieldSpec.DataType.STRING).addSingleValueDimension("c",
FieldSpec.DataType.STRING)
.addSingleValueDimension("d",
FieldSpec.DataType.STRING).addSingleValueDimension("e",
FieldSpec.DataType.STRING)
.addSingleValueDimension("f", FieldSpec.DataType.STRING).build();
- List<TransformConfig> transformConfigs = new ArrayList<>();
- transformConfigs.add(new TransformConfig("d", "plus(x, 10)"));
- transformConfigs.add(new TransformConfig("b", "plus(d, 10)"));
- transformConfigs.add(new TransformConfig("a", "plus(b, 10)"));
- transformConfigs.add(new TransformConfig("c", "plus(a, d)"));
- transformConfigs.add(new TransformConfig("f", "plus(e, 10)"));
- IngestionConfig ingestionConfig = new IngestionConfig(null, null, null,
transformConfigs, null, null);
+ List<TransformConfig> transformConfigs = Arrays.asList(
+ new TransformConfig("d", "plus(x, 10)"),
+ new TransformConfig("b", "plus(d, 10)"),
+ new TransformConfig("a", "plus(b, 10)"),
+ new TransformConfig("c", "plus(a, d)"),
+ new TransformConfig("f", "plus(e, 10)"));
+ IngestionConfig ingestionConfig = new IngestionConfig();
+ ingestionConfig.setTransformConfigs(transformConfigs);
TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName("testDerivedFunctions")
.setIngestionConfig(ingestionConfig).build();
ExpressionTransformer expressionTransformer = new
ExpressionTransformer(tableConfig, schema);
@@ -268,11 +277,11 @@ public class ExpressionTransformerTest {
.addSingleValueDimension("b",
FieldSpec.DataType.INT).addSingleValueDimension("c", FieldSpec.DataType.INT)
.build();
- List<TransformConfig> transformConfigs = new ArrayList<>();
- transformConfigs.add(new TransformConfig("a", "plus(b,10)"));
- transformConfigs.add(new TransformConfig("a", "plus(c,10)"));
-
- IngestionConfig ingestionConfig = new IngestionConfig(null, null, null,
transformConfigs, null, null);
+ List<TransformConfig> transformConfigs = Arrays.asList(
+ new TransformConfig("a", "plus(b,10)"),
+ new TransformConfig("a", "plus(c,10)"));
+ IngestionConfig ingestionConfig = new IngestionConfig();
+ ingestionConfig.setTransformConfigs(transformConfigs);
TableConfig tableConfig =
new
TableConfigBuilder(TableType.OFFLINE).setTableName("testMultipleTransformFunctionSortOrder")
.setIngestionConfig(ingestionConfig).build();
@@ -289,13 +298,13 @@ public class ExpressionTransformerTest {
.build();
// Define transform function dependencies: a -> (b,c), b -> d, d -> e, c
-> (d,e)
- List<TransformConfig> transformConfigs = new ArrayList<>();
- transformConfigs.add(new TransformConfig("a", "plus(b,c)"));
- transformConfigs.add(new TransformConfig("b", "plus(d,10)"));
- transformConfigs.add(new TransformConfig("d", "plus(e,10)"));
- transformConfigs.add(new TransformConfig("c", "plus(d,e)"));
-
- IngestionConfig ingestionConfig = new IngestionConfig(null, null, null,
transformConfigs, null, null);
+ List<TransformConfig> transformConfigs = Arrays.asList(
+ new TransformConfig("a", "plus(b,c)"),
+ new TransformConfig("b", "plus(d,10)"),
+ new TransformConfig("d", "plus(e,10)"),
+ new TransformConfig("c", "plus(d,e)"));
+ IngestionConfig ingestionConfig = new IngestionConfig();
+ ingestionConfig.setTransformConfigs(transformConfigs);
TableConfig tableConfig =
new
TableConfigBuilder(TableType.OFFLINE).setTableName("testNonCyclicTransformFunctionSortOrder")
.setIngestionConfig(ingestionConfig).build();
@@ -318,12 +327,12 @@ public class ExpressionTransformerTest {
.build();
// Define transform function dependencies: a -> b, b -> c, c -> a
- List<TransformConfig> transformConfigs = new ArrayList<>();
- transformConfigs.add(new TransformConfig("a", "plus(b,10)"));
- transformConfigs.add(new TransformConfig("b", "plus(c,10)"));
- transformConfigs.add(new TransformConfig("c", "plus(a,10)"));
-
- IngestionConfig ingestionConfig = new IngestionConfig(null, null, null,
transformConfigs, null, null);
+ List<TransformConfig> transformConfigs = Arrays.asList(
+ new TransformConfig("a", "plus(b,10)"),
+ new TransformConfig("b", "plus(c,10)"),
+ new TransformConfig("c", "plus(a,10)"));
+ IngestionConfig ingestionConfig = new IngestionConfig();
+ ingestionConfig.setTransformConfigs(transformConfigs);
TableConfig tableConfig =
new
TableConfigBuilder(TableType.OFFLINE).setTableName("testRecrusiveTransformFunctionSortOrder")
.setIngestionConfig(ingestionConfig).build();
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerTest.java
index f479fc351b..6cc0d2bfb7 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/RecordTransformerTest.java
@@ -87,35 +87,34 @@ public class RecordTransformerTest {
@Test
public void testFilterTransformer() {
- TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
+ IngestionConfig ingestionConfig = new IngestionConfig();
+ TableConfig tableConfig =
+ new
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setIngestionConfig(ingestionConfig).build();
// expression false, not filtered
+ ingestionConfig.setFilterConfig(new FilterConfig("Groovy({svInt > 123},
svInt)"));
GenericRow genericRow = getRecord();
- tableConfig.setIngestionConfig(
- new IngestionConfig(null, null, new FilterConfig("Groovy({svInt >
123}, svInt)"), null, null, null));
+ tableConfig.setIngestionConfig(ingestionConfig);
RecordTransformer transformer = new FilterTransformer(tableConfig);
transformer.transform(genericRow);
Assert.assertFalse(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
// expression true, filtered
+ ingestionConfig.setFilterConfig(new FilterConfig("Groovy({svInt <= 123},
svInt)"));
genericRow = getRecord();
- tableConfig.setIngestionConfig(
- new IngestionConfig(null, null, new FilterConfig("Groovy({svInt <=
123}, svInt)"), null, null, null));
transformer = new FilterTransformer(tableConfig);
transformer.transform(genericRow);
Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
// value not found
+ ingestionConfig.setFilterConfig(new FilterConfig("Groovy({notPresent ==
123}, notPresent)"));
genericRow = getRecord();
- tableConfig.setIngestionConfig(
- new IngestionConfig(null, null, new FilterConfig("Groovy({notPresent
== 123}, notPresent)"), null, null, null));
transformer = new FilterTransformer(tableConfig);
transformer.transform(genericRow);
Assert.assertFalse(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
// invalid function
- tableConfig.setIngestionConfig(
- new IngestionConfig(null, null, new FilterConfig("Groovy(svInt ==
123)"), null, null, null));
+ ingestionConfig.setFilterConfig(new FilterConfig("Groovy(svInt == 123)"));
try {
new FilterTransformer(tableConfig);
Assert.fail("Should have failed constructing FilterTransformer");
@@ -124,9 +123,8 @@ public class RecordTransformerTest {
}
// multi value column
+ ingestionConfig.setFilterConfig(new FilterConfig("Groovy({svFloat.max() <
500}, svFloat)"));
genericRow = getRecord();
- tableConfig.setIngestionConfig(
- new IngestionConfig(null, null, new
FilterConfig("Groovy({svFloat.max() < 500}, svFloat)"), null, null, null));
transformer = new FilterTransformer(tableConfig);
transformer.transform(genericRow);
Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
@@ -181,75 +179,62 @@ public class RecordTransformerTest {
@Test
public void testScalarOps() {
- TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
+ IngestionConfig ingestionConfig = new IngestionConfig();
+ TableConfig tableConfig =
+ new
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setIngestionConfig(ingestionConfig).build();
// expression true, filtered
+ ingestionConfig.setFilterConfig(new FilterConfig("svInt = 123"));
GenericRow genericRow = getRecord();
- tableConfig.setIngestionConfig(
- new IngestionConfig(null, null,
- new FilterConfig("svInt = 123"), null, null, null));
RecordTransformer transformer = new FilterTransformer(tableConfig);
transformer.transform(genericRow);
Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
// expression true, filtered
+ ingestionConfig.setFilterConfig(new FilterConfig("svDouble > 120"));
genericRow = getRecord();
- tableConfig.setIngestionConfig(
- new IngestionConfig(null, null,
- new FilterConfig("svDouble > 120"), null, null, null));
transformer = new FilterTransformer(tableConfig);
transformer.transform(genericRow);
Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
// expression true, filtered
+ ingestionConfig.setFilterConfig(new FilterConfig("svDouble >= 123"));
genericRow = getRecord();
- tableConfig.setIngestionConfig(
- new IngestionConfig(null, null,
- new FilterConfig("svDouble >= 123"), null, null, null));
transformer = new FilterTransformer(tableConfig);
transformer.transform(genericRow);
Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
// expression true, filtered
+ ingestionConfig.setFilterConfig(new FilterConfig("svDouble < 200"));
genericRow = getRecord();
- tableConfig.setIngestionConfig(
- new IngestionConfig(null, null,
- new FilterConfig("svDouble < 200"), null, null, null));
transformer = new FilterTransformer(tableConfig);
transformer.transform(genericRow);
Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
// expression true, filtered
+ ingestionConfig.setFilterConfig(new FilterConfig("svDouble <= 123"));
genericRow = getRecord();
- tableConfig.setIngestionConfig(
- new IngestionConfig(null, null,
- new FilterConfig("svDouble <= 123"), null, null, null));
transformer = new FilterTransformer(tableConfig);
transformer.transform(genericRow);
Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
// expression true, filtered
+ ingestionConfig.setFilterConfig(new FilterConfig("svLong != 125"));
genericRow = getRecord();
- tableConfig.setIngestionConfig(
- new IngestionConfig(null, null,
- new FilterConfig("svLong != 125"), null, null, null));
transformer = new FilterTransformer(tableConfig);
transformer.transform(genericRow);
Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
// expression true, filtered
+ ingestionConfig.setFilterConfig(new FilterConfig("svLong = 123"));
genericRow = getRecord();
- tableConfig.setIngestionConfig(
- new IngestionConfig(null, null,
- new FilterConfig("svLong = 123"), null, null, null));
transformer = new FilterTransformer(tableConfig);
transformer.transform(genericRow);
Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
// expression true, filtered
+ ingestionConfig.setFilterConfig(new FilterConfig("between(svLong, 100,
125)"));
genericRow = getRecord();
- tableConfig.setIngestionConfig(
- new IngestionConfig(null, null, new FilterConfig("between(svLong, 100,
125)"), null, null, null));
transformer = new FilterTransformer(tableConfig);
transformer.transform(genericRow);
Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
@@ -267,36 +252,34 @@ public class RecordTransformerTest {
@Test
public void testObjectOps() {
- TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
+ IngestionConfig ingestionConfig = new IngestionConfig();
+ TableConfig tableConfig =
+ new
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setIngestionConfig(ingestionConfig).build();
// expression true, filtered
+ ingestionConfig.setFilterConfig(new FilterConfig("svNullString is null"));
GenericRow genericRow = getNullColumnsRecord();
- tableConfig.setIngestionConfig(
- new IngestionConfig(null, null, new FilterConfig("svNullString is
null"), null, null, null));
RecordTransformer transformer = new FilterTransformer(tableConfig);
transformer.transform(genericRow);
Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
// expression true, filtered
+ ingestionConfig.setFilterConfig(new FilterConfig("svInt is not null"));
genericRow = getNullColumnsRecord();
- tableConfig.setIngestionConfig(
- new IngestionConfig(null, null, new FilterConfig("svInt is not null"),
null, null, null));
transformer = new FilterTransformer(tableConfig);
transformer.transform(genericRow);
Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
// expression true, filtered
+ ingestionConfig.setFilterConfig(new FilterConfig("mvLong is not null"));
genericRow = getNullColumnsRecord();
- tableConfig.setIngestionConfig(
- new IngestionConfig(null, null, new FilterConfig("mvLong is not
null"), null, null, null));
transformer = new FilterTransformer(tableConfig);
transformer.transform(genericRow);
Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
// expression true, filtered
+ ingestionConfig.setFilterConfig(new FilterConfig("mvNullFloat is null"));
genericRow = getNullColumnsRecord();
- tableConfig.setIngestionConfig(
- new IngestionConfig(null, null, new FilterConfig("mvNullFloat is
null"), null, null, null));
transformer = new FilterTransformer(tableConfig);
transformer.transform(genericRow);
Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
@@ -304,22 +287,20 @@ public class RecordTransformerTest {
@Test
public void testLogicalScalarOps() {
- TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
+ IngestionConfig ingestionConfig = new IngestionConfig();
+ TableConfig tableConfig =
+ new
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setIngestionConfig(ingestionConfig).build();
// expression true, filtered
+ ingestionConfig.setFilterConfig(new FilterConfig("svInt = 123 AND svDouble
<= 200"));
GenericRow genericRow = getRecord();
- tableConfig.setIngestionConfig(
- new IngestionConfig(null, null,
- new FilterConfig("svInt = 123 AND svDouble <= 200"), null, null,
null));
RecordTransformer transformer = new FilterTransformer(tableConfig);
transformer.transform(genericRow);
Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
// expression true, filtered
+ ingestionConfig.setFilterConfig(new FilterConfig("svInt = 125 OR svLong <=
200"));
genericRow = getRecord();
- tableConfig.setIngestionConfig(
- new IngestionConfig(null, null,
- new FilterConfig("svInt = 125 OR svLong <= 200"), null, null,
null));
transformer = new FilterTransformer(tableConfig);
transformer.transform(genericRow);
Assert.assertTrue(genericRow.getFieldToValueMap().containsKey(GenericRow.SKIP_RECORD_KEY));
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/SegmentGenerationWithFilterRecordsTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/SegmentGenerationWithFilterRecordsTest.java
index be0462dae3..b6c495d820 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/SegmentGenerationWithFilterRecordsTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/creator/SegmentGenerationWithFilterRecordsTest.java
@@ -67,8 +67,8 @@ public class SegmentGenerationWithFilterRecordsTest {
public void setup() {
String filterFunction =
"Groovy({((col2 < 1589007600000L) && (col3.max() < 4)) || col1 ==
\"B\"}, col1, col2, col3)";
- IngestionConfig ingestionConfig =
- new IngestionConfig(null, null, new FilterConfig(filterFunction),
null, null, null);
+ IngestionConfig ingestionConfig = new IngestionConfig();
+ ingestionConfig.setFilterConfig(new FilterConfig(filterFunction));
_tableConfig =
new
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setIngestionConfig(ingestionConfig).build();
_schema = new
Schema.SchemaBuilder().addSingleValueDimension(STRING_COLUMN,
FieldSpec.DataType.STRING)
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
index 8afb04880c..861743ebb0 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/segment/index/loader/SegmentPreProcessorTest.java
@@ -590,9 +590,10 @@ public class SegmentPreProcessorTest {
public void testV1UpdateDefaultColumns()
throws Exception {
constructV1Segment();
- _tableConfig.setIngestionConfig(new IngestionConfig(null, null, null,
- Collections.singletonList(new
TransformConfig(NEW_INT_SV_DIMENSION_COLUMN_NAME, "plus(column1, 1)")), null,
- null));
+ IngestionConfig ingestionConfig = new IngestionConfig();
+ ingestionConfig.setTransformConfigs(
+ Collections.singletonList(new
TransformConfig(NEW_INT_SV_DIMENSION_COLUMN_NAME, "plus(column1, 1)")));
+ _tableConfig.setIngestionConfig(ingestionConfig);
_indexLoadingConfig.getInvertedIndexColumns().add(NEW_COLUMN_INVERTED_INDEX);
checkUpdateDefaultColumns();
@@ -636,9 +637,10 @@ public class SegmentPreProcessorTest {
SegmentMetadataImpl segmentMetadata = new SegmentMetadataImpl(_indexDir);
assertEquals(segmentMetadata.getVersion(), SegmentVersion.v3);
- _tableConfig.setIngestionConfig(new IngestionConfig(null, null, null,
- Collections.singletonList(new
TransformConfig(NEW_INT_SV_DIMENSION_COLUMN_NAME, "plus(column1, 1)")), null,
- null));
+ IngestionConfig ingestionConfig = new IngestionConfig();
+ ingestionConfig.setTransformConfigs(
+ Collections.singletonList(new
TransformConfig(NEW_INT_SV_DIMENSION_COLUMN_NAME, "plus(column1, 1)")));
+ _tableConfig.setIngestionConfig(ingestionConfig);
_indexLoadingConfig.getInvertedIndexColumns().add(NEW_COLUMN_INVERTED_INDEX);
checkUpdateDefaultColumns();
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/IngestionUtilsTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/IngestionUtilsTest.java
index d1c5e660ee..9b54600e93 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/IngestionUtilsTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/IngestionUtilsTest.java
@@ -22,9 +22,8 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.HashMap;
+import java.util.Collections;
import java.util.List;
-import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.spi.config.table.ingestion.AggregationConfig;
@@ -90,16 +89,16 @@ public class IngestionUtilsTest {
// Time field spec
// only incoming
- schema = new Schema.SchemaBuilder()
- .addTime(new TimeGranularitySpec(FieldSpec.DataType.LONG,
TimeUnit.MILLISECONDS, "time"), null).build();
+ schema = new Schema.SchemaBuilder().addTime(
+ new TimeGranularitySpec(FieldSpec.DataType.LONG,
TimeUnit.MILLISECONDS, "time"), null).build();
extract = new ArrayList<>(IngestionUtils.getFieldsForRecordExtractor(null,
schema));
Assert.assertEquals(extract.size(), 1);
Assert.assertTrue(extract.contains("time"));
// incoming and outgoing different column name
- schema = new Schema.SchemaBuilder()
- .addTime(new TimeGranularitySpec(FieldSpec.DataType.LONG,
TimeUnit.MILLISECONDS, "in"),
- new TimeGranularitySpec(FieldSpec.DataType.LONG,
TimeUnit.MILLISECONDS, "out")).build();
+ schema = new Schema.SchemaBuilder().addTime(
+ new TimeGranularitySpec(FieldSpec.DataType.LONG,
TimeUnit.MILLISECONDS, "in"),
+ new TimeGranularitySpec(FieldSpec.DataType.LONG,
TimeUnit.MILLISECONDS, "out")).build();
extract = new ArrayList<>(IngestionUtils.getFieldsForRecordExtractor(null,
schema));
Assert.assertEquals(extract.size(), 2);
Assert.assertTrue(extract.containsAll(Arrays.asList("in", "out")));
@@ -138,8 +137,8 @@ public class IngestionUtilsTest {
Schema schema = new Schema();
// filter config
- IngestionConfig ingestionConfig =
- new IngestionConfig(null, null, new FilterConfig("Groovy({x > 100},
x)"), null, null, null);
+ IngestionConfig ingestionConfig = new IngestionConfig();
+ ingestionConfig.setFilterConfig(new FilterConfig("Groovy({x > 100}, x)"));
Set<String> fields =
IngestionUtils.getFieldsForRecordExtractor(ingestionConfig, schema);
Assert.assertEquals(fields.size(), 1);
Assert.assertTrue(fields.containsAll(Sets.newHashSet("x")));
@@ -151,24 +150,23 @@ public class IngestionUtilsTest {
// transform configs
schema = new Schema.SchemaBuilder().addSingleValueDimension("d1",
FieldSpec.DataType.STRING).build();
- List<TransformConfig> transformConfigs =
- Lists.newArrayList(new TransformConfig("d1", "Groovy({function},
argument1, argument2)"));
- ingestionConfig = new IngestionConfig(null, null, null, transformConfigs,
null, null);
+ ingestionConfig = new IngestionConfig();
+ ingestionConfig.setTransformConfigs(
+ Collections.singletonList(new TransformConfig("d1",
"Groovy({function}, argument1, argument2)")));
List<String> extract = new
ArrayList<>(IngestionUtils.getFieldsForRecordExtractor(ingestionConfig,
schema));
Assert.assertEquals(extract.size(), 3);
Assert.assertTrue(extract.containsAll(Arrays.asList("d1", "argument1",
"argument2")));
// groovy function, no arguments
- transformConfigs = Lists.newArrayList(new TransformConfig("d1",
"Groovy({function})"));
- ingestionConfig = new IngestionConfig(null, null, null, transformConfigs,
null, null);
+ ingestionConfig.setTransformConfigs(Collections.singletonList(new
TransformConfig("d1", "Groovy({function})")));
extract = new
ArrayList<>(IngestionUtils.getFieldsForRecordExtractor(ingestionConfig,
schema));
Assert.assertEquals(extract.size(), 1);
Assert.assertTrue(extract.contains("d1"));
// inbuilt functions
schema = new
Schema.SchemaBuilder().addSingleValueDimension("hoursSinceEpoch",
FieldSpec.DataType.LONG).build();
- transformConfigs = Lists.newArrayList(new
TransformConfig("hoursSinceEpoch", "toEpochHours(timestampColumn)"));
- ingestionConfig = new IngestionConfig(null, null, null, transformConfigs,
null, null);
+ ingestionConfig.setTransformConfigs(
+ Collections.singletonList(new TransformConfig("hoursSinceEpoch",
"toEpochHours(timestampColumn)")));
extract = new
ArrayList<>(IngestionUtils.getFieldsForRecordExtractor(ingestionConfig,
schema));
Assert.assertEquals(extract.size(), 2);
Assert.assertTrue(extract.containsAll(Arrays.asList("timestampColumn",
"hoursSinceEpoch")));
@@ -176,19 +174,17 @@ public class IngestionUtilsTest {
// inbuilt functions with literal
schema =
new
Schema.SchemaBuilder().addSingleValueDimension("tenMinutesSinceEpoch",
FieldSpec.DataType.LONG).build();
- transformConfigs =
- Lists.newArrayList(new TransformConfig("tenMinutesSinceEpoch",
"toEpochMinutesBucket(timestampColumn, 10)"));
- ingestionConfig = new IngestionConfig(null, null, null, transformConfigs,
null, null);
+ ingestionConfig.setTransformConfigs(Collections.singletonList(
+ new TransformConfig("tenMinutesSinceEpoch",
"toEpochMinutesBucket(timestampColumn, 10)")));
extract = new
ArrayList<>(IngestionUtils.getFieldsForRecordExtractor(ingestionConfig,
schema));
Assert.assertEquals(extract.size(), 2);
Assert.assertTrue(extract.containsAll(Lists.newArrayList("tenMinutesSinceEpoch",
"timestampColumn")));
// inbuilt functions on DateTimeFieldSpec
- schema = new Schema.SchemaBuilder()
- .addDateTime("dateColumn", FieldSpec.DataType.STRING,
"1:DAYS:SIMPLE_DATE_FORMAT:yyyy-MM-dd", "1:DAYS").build();
- transformConfigs =
- Lists.newArrayList(new TransformConfig("dateColumn",
"toDateTime(timestampColumn, 'yyyy-MM-dd')"));
- ingestionConfig = new IngestionConfig(null, null, null, transformConfigs,
null, null);
+ schema = new Schema.SchemaBuilder().addDateTime("dateColumn",
FieldSpec.DataType.STRING,
+ "1:DAYS:SIMPLE_DATE_FORMAT:yyyy-MM-dd", "1:DAYS").build();
+ ingestionConfig.setTransformConfigs(
+ Collections.singletonList(new TransformConfig("dateColumn",
"toDateTime(timestampColumn, 'yyyy-MM-dd')")));
extract = new
ArrayList<>(IngestionUtils.getFieldsForRecordExtractor(ingestionConfig,
schema));
Assert.assertEquals(extract.size(), 2);
Assert.assertTrue(extract.containsAll(Lists.newArrayList("dateColumn",
"timestampColumn")));
@@ -198,58 +194,41 @@ public class IngestionUtilsTest {
.addSingleValueDimension("d2",
FieldSpec.DataType.STRING).addMetric("m1", FieldSpec.DataType.INT)
.addDateTime("dateColumn", FieldSpec.DataType.STRING,
"1:DAYS:SIMPLE_DATE_FORMAT:yyyy-MM-dd", "1:DAYS").build();
schema.getFieldSpecFor("d2").setTransformFunction("reverse(xy)");
- transformConfigs =
- Lists.newArrayList(new TransformConfig("dateColumn",
"toDateTime(timestampColumn, 'yyyy-MM-dd')"));
- ingestionConfig =
- new IngestionConfig(null, null, new FilterConfig("Groovy({d1 ==
\"10\"}, d1)"), transformConfigs, null, null);
+ ingestionConfig.setFilterConfig(new FilterConfig("Groovy({d1 == \"10\"},
d1)"));
extract = new
ArrayList<>(IngestionUtils.getFieldsForRecordExtractor(ingestionConfig,
schema));
Assert.assertEquals(extract.size(), 6);
Assert.assertTrue(extract.containsAll(Lists.newArrayList("d1", "d2", "m1",
"dateColumn", "xy", "timestampColumn")));
// filter + transform configs + schema fields + schema transform +
complex type configs
schema = new Schema.SchemaBuilder().addSingleValueDimension("d1",
FieldSpec.DataType.STRING)
- .addSingleValueDimension("d2", FieldSpec.DataType.STRING)
- .addMetric("m1", FieldSpec.DataType.INT)
- .addDateTime("dateColumn", FieldSpec.DataType.STRING,
"1:DAYS:SIMPLE_DATE_FORMAT:yyyy-MM-dd", "1:DAYS")
- .build();
+ .addSingleValueDimension("d2",
FieldSpec.DataType.STRING).addMetric("m1", FieldSpec.DataType.INT)
+ .addDateTime("dateColumn", FieldSpec.DataType.STRING,
"1:DAYS:SIMPLE_DATE_FORMAT:yyyy-MM-dd", "1:DAYS").build();
schema.getFieldSpecFor("d2").setTransformFunction("reverse(xy)");
- TransformConfig transformConfig = new TransformConfig("dateColumn",
"toDateTime(timestampColumn, 'yyyy-MM-dd')");
- transformConfigs = Lists.newArrayList(transformConfig);
- List<String> fieldsToUnnest = Arrays.asList("before.test", "after.test");
- Map<String, String> prefixesToRename = new HashMap<>();
- prefixesToRename.put("before", "after");
- ComplexTypeConfig complexTypeConfigs = new
ComplexTypeConfig(fieldsToUnnest, ".",
- ComplexTypeConfig.CollectionNotUnnestedToJson.NON_PRIMITIVE,
prefixesToRename);
- FilterConfig filterConfig = new FilterConfig("Groovy({d1 == \"10\"}, d1)");
- ingestionConfig = new IngestionConfig(null, null, filterConfig,
transformConfigs, complexTypeConfigs, null);
+ ingestionConfig.setComplexTypeConfig(new
ComplexTypeConfig(Arrays.asList("before.test", "after.test"), ".",
+ ComplexTypeConfig.CollectionNotUnnestedToJson.NON_PRIMITIVE,
Collections.singletonMap("before", "after")));
extract = new
ArrayList<>(IngestionUtils.getFieldsForRecordExtractor(ingestionConfig,
schema));
Assert.assertEquals(extract.size(), 8);
List<String> expectedColumns =
- Lists.newArrayList("d1", "d2", "m1", "dateColumn", "xy",
"timestampColumn", "before", "after");
+ Arrays.asList("d1", "d2", "m1", "dateColumn", "xy", "timestampColumn",
"before", "after");
Assert.assertTrue(extract.containsAll(expectedColumns));
}
@Test
public void testExtractFieldsAggregationConfig() {
+ IngestionConfig ingestionConfig = new IngestionConfig();
Schema schema = new Schema();
- List<AggregationConfig> aggregationConfigs = Arrays.asList(new
AggregationConfig("d1", "SUM(s1)"));
- IngestionConfig ingestionConfig = new IngestionConfig(null, null, null,
null, null, aggregationConfigs);
-
+ ingestionConfig.setAggregationConfigs(Collections.singletonList(new
AggregationConfig("d1", "SUM(s1)")));
Set<String> fields =
IngestionUtils.getFieldsForRecordExtractor(ingestionConfig, schema);
Assert.assertEquals(fields.size(), 1);
Assert.assertTrue(fields.containsAll(Sets.newHashSet("s1")));
- aggregationConfigs = Arrays.asList(new AggregationConfig("d1", "MIN(s1)"));
- ingestionConfig = new IngestionConfig(null, null, null, null, null,
aggregationConfigs);
-
+ ingestionConfig.setAggregationConfigs(Collections.singletonList(new
AggregationConfig("d1", "MIN(s1)")));
fields = IngestionUtils.getFieldsForRecordExtractor(ingestionConfig,
schema);
Assert.assertEquals(fields.size(), 1);
Assert.assertTrue(fields.containsAll(Sets.newHashSet("s1")));
- aggregationConfigs = Arrays.asList(new AggregationConfig("d1", "MAX(s1)"));
- ingestionConfig = new IngestionConfig(null, null, null, null, null,
aggregationConfigs);
-
+ ingestionConfig.setAggregationConfigs(Collections.singletonList(new
AggregationConfig("d1", "MAX(s1)")));
fields = IngestionUtils.getFieldsForRecordExtractor(ingestionConfig,
schema);
Assert.assertEquals(fields.size(), 1);
Assert.assertTrue(fields.containsAll(Sets.newHashSet("s1")));
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
index f074f6c695..f1578b1970 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
@@ -45,7 +45,9 @@ import
org.apache.pinot.spi.config.table.ingestion.FilterConfig;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig;
import org.apache.pinot.spi.config.table.ingestion.TransformConfig;
+import org.apache.pinot.spi.data.DimensionFieldSpec;
import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.MetricFieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
import org.apache.pinot.spi.stream.StreamConfigProperties;
@@ -229,31 +231,24 @@ public class TableConfigUtilsTest {
TableConfigUtils.validate(tableConfig, schema);
// null filter config, transform config
- tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
- .setIngestionConfig(new IngestionConfig(null, null, null, null, null,
null)).build();
+ IngestionConfig ingestionConfig = new IngestionConfig();
+ tableConfig.setIngestionConfig(ingestionConfig);
TableConfigUtils.validate(tableConfig, schema);
// null filter function
- tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
- .setIngestionConfig(new IngestionConfig(null, null, new
FilterConfig(null), null, null, null)).build();
+ ingestionConfig.setFilterConfig(new FilterConfig(null));
TableConfigUtils.validate(tableConfig, schema);
// valid filterFunction
- tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig(
- new IngestionConfig(null, null, new
FilterConfig("startsWith(columnX, \"myPrefix\")"), null, null, null))
- .build();
+ ingestionConfig.setFilterConfig(new FilterConfig("startsWith(columnX,
\"myPrefix\")"));
TableConfigUtils.validate(tableConfig, schema);
// valid filterFunction
- tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
- .setIngestionConfig(new IngestionConfig(null, null, new
FilterConfig("Groovy({x == 10}, x)"), null, null, null))
- .build();
+ ingestionConfig.setFilterConfig(new FilterConfig("Groovy({x == 10}, x)"));
TableConfigUtils.validate(tableConfig, schema);
// invalid filter function
- tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
- .setIngestionConfig(new IngestionConfig(null, null, new
FilterConfig("Groovy(badExpr)"), null, null, null))
- .build();
+ ingestionConfig.setFilterConfig(new FilterConfig("Groovy(badExpr)"));
try {
TableConfigUtils.validate(tableConfig, schema);
Assert.fail("Should fail on invalid filter function string");
@@ -261,9 +256,7 @@ public class TableConfigUtilsTest {
// expected
}
- tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
- .setIngestionConfig(new IngestionConfig(null, null, new
FilterConfig("fakeFunction(xx)"), null, null, null))
- .build();
+ ingestionConfig.setFilterConfig(new FilterConfig("fakeFunction(xx)"));
try {
TableConfigUtils.validate(tableConfig, schema);
Assert.fail("Should fail for invalid filter function");
@@ -272,14 +265,12 @@ public class TableConfigUtilsTest {
}
// empty transform configs
- tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
- .setIngestionConfig(new IngestionConfig(null, null, null,
Collections.emptyList(), null, null)).build();
+ ingestionConfig.setFilterConfig(null);
+ ingestionConfig.setTransformConfigs(Collections.emptyList());
TableConfigUtils.validate(tableConfig, schema);
// transformed column not in schema
- tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig(
- new IngestionConfig(null, null, null, Lists.newArrayList(new
TransformConfig("myCol", "reverse(anotherCol)")),
- null, null)).build();
+ ingestionConfig.setTransformConfigs(Collections.singletonList(new
TransformConfig("myCol", "reverse(anotherCol)")));
try {
TableConfigUtils.validate(tableConfig, schema);
Assert.fail("Should fail for transformedColumn not present in schema");
@@ -288,29 +279,26 @@ public class TableConfigUtilsTest {
}
// using a transformation column in an aggregation
- schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
- .addMetric("twiceSum", FieldSpec.DataType.DOUBLE).build();
- tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig(
- new IngestionConfig(null, null, null, Lists.newArrayList(new
TransformConfig("twice", "col * 2")),
- null, Lists.newArrayList((new AggregationConfig("twiceSum",
"SUM(twice)"))))).build();
+ schema =
+ new
Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addMetric("twiceSum",
FieldSpec.DataType.DOUBLE).build();
+ ingestionConfig.setTransformConfigs(Collections.singletonList(new
TransformConfig("twice", "col * 2")));
+ ingestionConfig.setAggregationConfigs(Collections.singletonList(new
AggregationConfig("twiceSum", "SUM(twice)")));
TableConfigUtils.validate(tableConfig, schema);
+ // valid transform configs
schema =
new
Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension("myCol",
FieldSpec.DataType.STRING)
.build();
- // valid transform configs
- tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig(
- new IngestionConfig(null, null, null, Lists.newArrayList(new
TransformConfig("myCol", "reverse(anotherCol)")),
- null, null)).build();
+ ingestionConfig.setAggregationConfigs(null);
+ ingestionConfig.setTransformConfigs(Collections.singletonList(new
TransformConfig("myCol", "reverse(anotherCol)")));
TableConfigUtils.validate(tableConfig, schema);
+ // valid transform configs
schema =
new
Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension("myCol",
FieldSpec.DataType.STRING)
.addMetric("transformedCol", FieldSpec.DataType.LONG).build();
- // valid transform configs
- tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig(
- new IngestionConfig(null, null, null, Lists.newArrayList(new
TransformConfig("myCol", "reverse(anotherCol)"),
- new TransformConfig("transformedCol", "Groovy({x+y}, x, y)")),
null, null)).build();
+ ingestionConfig.setTransformConfigs(Arrays.asList(new
TransformConfig("myCol", "reverse(anotherCol)"),
+ new TransformConfig("transformedCol", "Groovy({x+y}, x, y)")));
TableConfigUtils.validate(tableConfig, schema);
// invalid transform config since Groovy is disabled
@@ -322,9 +310,8 @@ public class TableConfigUtilsTest {
}
// invalid filter config since Groovy is disabled
- tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig(
- new IngestionConfig(null, null, new
FilterConfig("Groovy({timestamp > 0}, timestamp)"), null, null, null))
- .build();
+ ingestionConfig.setTransformConfigs(null);
+ ingestionConfig.setFilterConfig(new FilterConfig("Groovy({timestamp > 0},
timestamp)"));
try {
TableConfigUtils.validate(tableConfig, schema, null, true);
Assert.fail("Should fail when Groovy functions disabled but found in
filter config");
@@ -333,9 +320,8 @@ public class TableConfigUtilsTest {
}
// null transform column name
- tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig(
- new IngestionConfig(null, null, null, Lists.newArrayList(new
TransformConfig(null, "reverse(anotherCol)")),
- null, null)).build();
+ ingestionConfig.setFilterConfig(null);
+ ingestionConfig.setTransformConfigs(Collections.singletonList(new
TransformConfig(null, "reverse(anotherCol)")));
try {
TableConfigUtils.validate(tableConfig, schema);
Assert.fail("Should fail for null column name in transform config");
@@ -344,9 +330,7 @@ public class TableConfigUtilsTest {
}
// null transform function string
- tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig(
- new IngestionConfig(null, null, null, Lists.newArrayList(new
TransformConfig("myCol", null)), null, null))
- .build();
+ ingestionConfig.setTransformConfigs(Collections.singletonList(new
TransformConfig("myCol", null)));
try {
TableConfigUtils.validate(tableConfig, schema);
Assert.fail("Should fail for null transform function in transform
config");
@@ -355,9 +339,7 @@ public class TableConfigUtilsTest {
}
// invalid function
- tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig(
- new IngestionConfig(null, null, null, Lists.newArrayList(new
TransformConfig("myCol", "fakeFunction(col)")),
- null, null)).build();
+ ingestionConfig.setTransformConfigs(Collections.singletonList(new
TransformConfig("myCol", "fakeFunction(col)")));
try {
TableConfigUtils.validate(tableConfig, schema);
Assert.fail("Should fail for invalid transform function in transform
config");
@@ -366,9 +348,7 @@ public class TableConfigUtilsTest {
}
// invalid function
- tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig(
- new IngestionConfig(null, null, null, Lists.newArrayList(new
TransformConfig("myCol", "Groovy(badExpr)")), null,
- null)).build();
+ ingestionConfig.setTransformConfigs(Collections.singletonList(new
TransformConfig("myCol", "Groovy(badExpr)")));
try {
TableConfigUtils.validate(tableConfig, schema);
Assert.fail("Should fail for invalid transform function in transform
config");
@@ -377,9 +357,7 @@ public class TableConfigUtilsTest {
}
// input field name used as destination field
- tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig(
- new IngestionConfig(null, null, null, Lists.newArrayList(new
TransformConfig("myCol", "reverse(myCol)")), null,
- null)).build();
+ ingestionConfig.setTransformConfigs(Collections.singletonList(new
TransformConfig("myCol", "reverse(myCol)")));
try {
TableConfigUtils.validate(tableConfig, schema);
Assert.fail("Should fail due to use of myCol as arguments and
columnName");
@@ -388,10 +366,8 @@ public class TableConfigUtilsTest {
}
// input field name used as destination field
- tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig(
- new IngestionConfig(null, null, null,
- Lists.newArrayList(new TransformConfig("myCol", "Groovy({x + y
+ myCol}, x, myCol, y)")), null, null))
- .build();
+ ingestionConfig.setTransformConfigs(
+ Collections.singletonList(new TransformConfig("myCol", "Groovy({x + y
+ myCol}, x, myCol, y)")));
try {
TableConfigUtils.validate(tableConfig, schema);
Assert.fail("Should fail due to use of myCol as arguments and
columnName");
@@ -400,10 +376,8 @@ public class TableConfigUtilsTest {
}
// duplicate transform config
- tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig(
- new IngestionConfig(null, null, null,
- Lists.newArrayList(new TransformConfig("myCol", "reverse(x)"), new
TransformConfig("myCol", "lower(y)")),
- null, null)).build();
+ ingestionConfig.setTransformConfigs(
+ Arrays.asList(new TransformConfig("myCol", "reverse(x)"), new
TransformConfig("myCol", "lower(y)")));
try {
TableConfigUtils.validate(tableConfig, schema);
Assert.fail("Should fail due to duplicate transform config");
@@ -412,19 +386,16 @@ public class TableConfigUtilsTest {
}
// derived columns - should pass
- tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig(
- new IngestionConfig(null, null, null, Lists.newArrayList(new
TransformConfig("transformedCol", "reverse(x)"),
- new TransformConfig("myCol", "lower(transformedCol)")), null,
null)).build();
+ ingestionConfig.setTransformConfigs(Arrays.asList(new
TransformConfig("transformedCol", "reverse(x)"),
+ new TransformConfig("myCol", "lower(transformedCol)")));
TableConfigUtils.validate(tableConfig, schema);
// invalid field name in schema with matching prefix from
complexConfigType's prefixesToRename
- HashMap<String, String> prefixesToRename = new HashMap<>();
- prefixesToRename.put("after.", "");
- ComplexTypeConfig complexConfig = new ComplexTypeConfig(null, ".", null,
prefixesToRename);
- tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
- .setIngestionConfig(new IngestionConfig(null, null, null, null,
complexConfig, null)).build();
+ ingestionConfig.setTransformConfigs(null);
+ ingestionConfig.setComplexTypeConfig(
+ new ComplexTypeConfig(null, ".", null,
Collections.singletonMap("after.", "")));
schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
- .addMultiValueDimension("after.test",
FieldSpec.DataType.STRING).build();
+ .addMultiValueDimension("after.test",
FieldSpec.DataType.STRING).build();
try {
TableConfigUtils.validate(tableConfig, schema);
Assert.fail("Should fail due to name conflict from field name in schema
with a prefix in prefixesToRename");
@@ -435,44 +406,30 @@ public class TableConfigUtilsTest {
@Test
public void ingestionAggregationConfigsTest() {
- List<AggregationConfig> aggregationConfigs = Arrays.asList(new
AggregationConfig("d1", "SUM(s1)"));
- IngestionConfig ingestionConfig =
- new IngestionConfig(null, null, null, null, null, aggregationConfigs);
-
- Schema schema =
- new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addMetric("d1",
FieldSpec.DataType.DOUBLE).build();
+ Schema schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME)
+ .addDateTime("timeColumn", FieldSpec.DataType.TIMESTAMP,
"1:MILLISECONDS:EPOCH", "1:MILLISECONDS").build();
+ IngestionConfig ingestionConfig = new IngestionConfig();
+ ingestionConfig.setAggregationConfigs(Collections.singletonList(new
AggregationConfig("d1", "SUM(s1)")));
TableConfig tableConfig =
- new
TableConfigBuilder(TableType.REALTIME).setTableName("myTable_REALTIME").setTimeColumnName("timeColumn")
-
.setIngestionConfig(ingestionConfig).setAggregateMetrics(true).build();
-
+ new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName("timeColumn")
+ .setIngestionConfig(ingestionConfig).build();
try {
TableConfigUtils.validateIngestionConfig(tableConfig, schema);
- Assert.fail("Should fail due to aggregateMetrics being set");
+ Assert.fail("Should fail due to destination column not being in schema");
} catch (IllegalStateException e) {
// expected
}
- schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).build();
- tableConfig =
- new
TableConfigBuilder(TableType.REALTIME).setTableName("myTable_REALTIME").setTimeColumnName("timeColumn")
- .setIngestionConfig(ingestionConfig).build();
+ schema.addField(new DimensionFieldSpec("d1", FieldSpec.DataType.DOUBLE,
true));
+ tableConfig.getIndexingConfig().setAggregateMetrics(true);
try {
TableConfigUtils.validateIngestionConfig(tableConfig, schema);
- Assert.fail("Should fail due to destination column not being in schema");
+ Assert.fail("Should fail due to aggregateMetrics being set");
} catch (IllegalStateException e) {
// expected
}
- schema =
- new
Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension("d1",
FieldSpec.DataType.DOUBLE)
- .build();
- aggregationConfigs = Arrays.asList(new AggregationConfig("d1", "SUM(s1)"));
- ingestionConfig =
- new IngestionConfig(null, null, null, null, null, aggregationConfigs);
- tableConfig =
- new
TableConfigBuilder(TableType.REALTIME).setTableName("myTable_REALTIME").setTimeColumnName("timeColumn")
- .setIngestionConfig(ingestionConfig).build();
-
+ tableConfig.getIndexingConfig().setAggregateMetrics(false);
try {
TableConfigUtils.validateIngestionConfig(tableConfig, schema);
Assert.fail("Should fail due to aggregation column being a dimension");
@@ -480,14 +437,8 @@ public class TableConfigUtilsTest {
// expected
}
- schema = new
Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addMetric("d1",
FieldSpec.DataType.DOUBLE).build();
- aggregationConfigs = Arrays.asList(new AggregationConfig(null, null));
- ingestionConfig =
- new IngestionConfig(null, null, null, null, null, aggregationConfigs);
- tableConfig =
- new
TableConfigBuilder(TableType.REALTIME).setTableName("myTable_REALTIME").setTimeColumnName("timeColumn")
- .setIngestionConfig(ingestionConfig).build();
-
+ schema.addField(new MetricFieldSpec("m1", FieldSpec.DataType.DOUBLE));
+ ingestionConfig.setAggregationConfigs(Collections.singletonList(new
AggregationConfig(null, null)));
try {
TableConfigUtils.validateIngestionConfig(tableConfig, schema);
Assert.fail("Should fail due to null columnName/aggregationFunction");
@@ -495,13 +446,8 @@ public class TableConfigUtilsTest {
// expected
}
- aggregationConfigs = Arrays.asList(new AggregationConfig("d1", "SUM(s1)"),
new AggregationConfig("d1", "SUM(s2)"));
- ingestionConfig =
- new IngestionConfig(null, null, null, null, null, aggregationConfigs);
- tableConfig =
- new
TableConfigBuilder(TableType.REALTIME).setTableName("myTable_REALTIME").setTimeColumnName("timeColumn")
- .setIngestionConfig(ingestionConfig).build();
-
+ ingestionConfig.setAggregationConfigs(
+ Arrays.asList(new AggregationConfig("m1", "SUM(s1)"), new
AggregationConfig("m1", "SUM(s2)")));
try {
TableConfigUtils.validateIngestionConfig(tableConfig, schema);
Assert.fail("Should fail due to duplicate destination column");
@@ -509,13 +455,7 @@ public class TableConfigUtilsTest {
// expected
}
- aggregationConfigs = Arrays.asList(new AggregationConfig("d1", "SUM s1"));
- ingestionConfig =
- new IngestionConfig(null, null, null, null, null, aggregationConfigs);
- tableConfig =
- new
TableConfigBuilder(TableType.REALTIME).setTableName("myTable_REALTIME").setTimeColumnName("timeColumn")
- .setIngestionConfig(ingestionConfig).build();
-
+ ingestionConfig.setAggregationConfigs(Collections.singletonList(new
AggregationConfig("m1", "SUM s1")));
try {
TableConfigUtils.validateIngestionConfig(tableConfig, schema);
Assert.fail("Should fail due to invalid aggregation function");
@@ -523,13 +463,8 @@ public class TableConfigUtilsTest {
// expected
}
- aggregationConfigs = Arrays.asList(new AggregationConfig("d1",
"DISTINCTCOUNTHLL(s1)"));
- ingestionConfig =
- new IngestionConfig(null, null, null, null, null, aggregationConfigs);
- tableConfig =
- new
TableConfigBuilder(TableType.REALTIME).setTableName("myTable_REALTIME").setTimeColumnName("timeColumn")
- .setIngestionConfig(ingestionConfig).build();
-
+ ingestionConfig.setAggregationConfigs(
+ Collections.singletonList(new AggregationConfig("m1",
"DISTINCTCOUNTHLL(s1)")));
try {
TableConfigUtils.validateIngestionConfig(tableConfig, schema);
Assert.fail("Should fail due to not supported aggregation function");
@@ -537,13 +472,7 @@ public class TableConfigUtilsTest {
// expected
}
- aggregationConfigs = Arrays.asList(new AggregationConfig("d1", "s1 + s2"));
- ingestionConfig =
- new IngestionConfig(null, null, null, null, null, aggregationConfigs);
- tableConfig =
- new
TableConfigBuilder(TableType.REALTIME).setTableName("myTable_REALTIME").setTimeColumnName("timeColumn")
- .setIngestionConfig(ingestionConfig).build();
-
+ ingestionConfig.setAggregationConfigs(Collections.singletonList(new
AggregationConfig("m1", "s1 + s2")));
try {
TableConfigUtils.validateIngestionConfig(tableConfig, schema);
Assert.fail("Should fail due to multiple arguments");
@@ -551,13 +480,7 @@ public class TableConfigUtilsTest {
// expected
}
- aggregationConfigs = Arrays.asList(new AggregationConfig("d1", "SUM(s1 -
s2)"));
- ingestionConfig =
- new IngestionConfig(null, null, null, null, null, aggregationConfigs);
- tableConfig =
- new
TableConfigBuilder(TableType.REALTIME).setTableName("myTable_REALTIME").setTimeColumnName("timeColumn")
- .setIngestionConfig(ingestionConfig).build();
-
+ ingestionConfig.setAggregationConfigs(Collections.singletonList(new
AggregationConfig("m1", "SUM(s1 - s2)")));
try {
TableConfigUtils.validateIngestionConfig(tableConfig, schema);
Assert.fail("Should fail due to inner value not being a column");
@@ -565,51 +488,28 @@ public class TableConfigUtilsTest {
// expected
}
- schema = new
Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addMetric("d1",
FieldSpec.DataType.DOUBLE).build();
- aggregationConfigs = Arrays.asList(new AggregationConfig("d1", "SUM(d1)"));
- ingestionConfig =
- new IngestionConfig(null, null, null, null, null, aggregationConfigs);
- tableConfig =
- new
TableConfigBuilder(TableType.REALTIME).setTableName("myTable_REALTIME").setTimeColumnName("timeColumn")
- .setIngestionConfig(ingestionConfig).build();
-
+ ingestionConfig.setAggregationConfigs(Collections.singletonList(new
AggregationConfig("m1", "SUM(m1)")));
TableConfigUtils.validateIngestionConfig(tableConfig, schema);
- schema = new
Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addMetric("d1",
FieldSpec.DataType.DOUBLE)
- .addMetric("d2", FieldSpec.DataType.DOUBLE).build();
- aggregationConfigs = Arrays.asList(new AggregationConfig("d1", "SUM(s1)"));
- ingestionConfig =
- new IngestionConfig(null, null, null, null, null, aggregationConfigs);
- tableConfig =
- new
TableConfigBuilder(TableType.REALTIME).setTableName("myTable_REALTIME").setTimeColumnName("timeColumn")
- .setIngestionConfig(ingestionConfig).build();
+ ingestionConfig.setAggregationConfigs(Collections.singletonList(new
AggregationConfig("m1", "SUM(s1)")));
+ TableConfigUtils.validateIngestionConfig(tableConfig, schema);
+ schema.addField(new MetricFieldSpec("m2", FieldSpec.DataType.DOUBLE));
try {
TableConfigUtils.validateIngestionConfig(tableConfig, schema);
Assert.fail("Should fail due to one metric column not being aggregated");
} catch (IllegalStateException e) {
// expected
}
-
-
- schema = new
Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addMetric("d1",
FieldSpec.DataType.DOUBLE).build();
- aggregationConfigs = Arrays.asList(new AggregationConfig("d1", "SUM(s1)"));
- ingestionConfig =
- new IngestionConfig(null, null, null, null, null, aggregationConfigs);
- tableConfig =
- new
TableConfigBuilder(TableType.REALTIME).setTableName("myTable_REALTIME").setTimeColumnName("timeColumn")
- .setIngestionConfig(ingestionConfig).build();
- TableConfigUtils.validateIngestionConfig(tableConfig, schema);
}
@Test
public void ingestionStreamConfigsTest() {
Map<String, String> streamConfigs = getStreamConfigs();
- IngestionConfig ingestionConfig =
- new IngestionConfig(null, new
StreamIngestionConfig(Lists.newArrayList(streamConfigs, streamConfigs)), null,
- null, null, null);
+ IngestionConfig ingestionConfig = new IngestionConfig();
+ ingestionConfig.setStreamIngestionConfig(new
StreamIngestionConfig(Arrays.asList(streamConfigs, streamConfigs)));
TableConfig tableConfig =
- new
TableConfigBuilder(TableType.REALTIME).setTableName("myTable_REALTIME").setTimeColumnName("timeColumn")
+ new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME).setTimeColumnName("timeColumn")
.setIngestionConfig(ingestionConfig).build();
// only 1 stream config allowed
@@ -621,9 +521,7 @@ public class TableConfigUtilsTest {
}
// stream config should be valid
- ingestionConfig =
- new IngestionConfig(null, new
StreamIngestionConfig(Lists.newArrayList(streamConfigs)), null, null, null,
null);
- tableConfig.setIngestionConfig(ingestionConfig);
+ ingestionConfig.setStreamIngestionConfig(new
StreamIngestionConfig(Collections.singletonList(streamConfigs)));
TableConfigUtils.validateIngestionConfig(tableConfig, null);
streamConfigs.remove(StreamConfigProperties.STREAM_TYPE);
@@ -645,11 +543,12 @@ public class TableConfigUtilsTest {
batchConfigMap.put(BatchConfigProperties.INPUT_FORMAT, "avro");
batchConfigMap.put(BatchConfigProperties.RECORD_READER_CLASS,
"org.foo.Reader");
- IngestionConfig ingestionConfig =
- new IngestionConfig(new
BatchIngestionConfig(Lists.newArrayList(batchConfigMap, batchConfigMap), null,
null),
- null, null, null, null, null);
+ IngestionConfig ingestionConfig = new IngestionConfig();
+ // TODO: Check if we should allow duplicate config maps
+ ingestionConfig.setBatchIngestionConfig(
+ new BatchIngestionConfig(Arrays.asList(batchConfigMap,
batchConfigMap), null, null));
TableConfig tableConfig =
- new
TableConfigBuilder(TableType.OFFLINE).setTableName("myTable_OFFLINE").setIngestionConfig(ingestionConfig)
+ new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIngestionConfig(ingestionConfig)
.build();
TableConfigUtils.validateIngestionConfig(tableConfig, null);
}
@@ -665,15 +564,15 @@ public class TableConfigUtilsTest {
batchConfigMap.put(BatchConfigProperties.RECORD_READER_CLASS,
"org.foo.Reader");
// valid dimension table ingestion config
+ IngestionConfig ingestionConfig = new IngestionConfig();
+ ingestionConfig.setBatchIngestionConfig(
+ new BatchIngestionConfig(Collections.singletonList(batchConfigMap),
"REFRESH", null));
TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIsDimTable(true)
- .setIngestionConfig(new IngestionConfig(
- new BatchIngestionConfig(Lists.newArrayList(batchConfigMap,
batchConfigMap), "REFRESH", null), null, null,
- null, null, null)).build();
+ .setIngestionConfig(ingestionConfig).build();
TableConfigUtils.validateIngestionConfig(tableConfig, null);
// dimension tables should have batch ingestion config
- tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIsDimTable(true)
- .setIngestionConfig(new IngestionConfig(null, null, null, null, null,
null)).build();
+ ingestionConfig.setBatchIngestionConfig(null);
try {
TableConfigUtils.validateIngestionConfig(tableConfig, null);
Assert.fail("Should fail for Dimension table without batch ingestion
config");
@@ -682,10 +581,8 @@ public class TableConfigUtilsTest {
}
// dimension tables should have batch ingestion config of type REFRESH
- tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setIsDimTable(true)
- .setIngestionConfig(new IngestionConfig(
- new BatchIngestionConfig(Lists.newArrayList(batchConfigMap,
batchConfigMap), "APPEND", null), null, null,
- null, null, null)).build();
+ ingestionConfig.setBatchIngestionConfig(
+ new BatchIngestionConfig(Collections.singletonList(batchConfigMap),
"APPEND", null));
try {
TableConfigUtils.validateIngestionConfig(tableConfig, null);
Assert.fail("Should fail for Dimension table with ingestion type APPEND
(should be REFRESH)");
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/BatchIngestionConfig.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/BatchIngestionConfig.java
index ba604f8b5e..2e7c475ff9 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/BatchIngestionConfig.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/BatchIngestionConfig.java
@@ -33,7 +33,7 @@ import org.apache.pinot.spi.config.BaseJsonConfig;
public class BatchIngestionConfig extends BaseJsonConfig {
@JsonPropertyDescription("Configs for all the batch sources to ingest from")
- private final List<Map<String, String>> _batchConfigMaps;
+ private List<Map<String, String>> _batchConfigMaps;
@JsonPropertyDescription("Ingestion type APPEND or REFRESH")
private String _segmentIngestionType;
@@ -63,6 +63,10 @@ public class BatchIngestionConfig extends BaseJsonConfig {
return _segmentIngestionFrequency;
}
+ public void setBatchConfigMaps(List<Map<String, String>> batchConfigMaps) {
+ _batchConfigMaps = batchConfigMaps;
+ }
+
public void setSegmentIngestionType(String segmentIngestionType) {
_segmentIngestionType = segmentIngestionType;
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/IngestionConfig.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/IngestionConfig.java
index 6b8329410a..ba471d899b 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/IngestionConfig.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/IngestionConfig.java
@@ -18,8 +18,6 @@
*/
package org.apache.pinot.spi.config.table.ingestion;
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyDescription;
import java.util.List;
import javax.annotation.Nullable;
@@ -50,14 +48,11 @@ public class IngestionConfig extends BaseJsonConfig {
@JsonPropertyDescription("Configs related to record aggregation function
applied during ingestion")
private List<AggregationConfig> _aggregationConfigs;
-
- @JsonCreator
- public IngestionConfig(@JsonProperty("batchIngestionConfig") @Nullable
BatchIngestionConfig batchIngestionConfig,
- @JsonProperty("streamIngestionConfig") @Nullable StreamIngestionConfig
streamIngestionConfig,
- @JsonProperty("filterConfig") @Nullable FilterConfig filterConfig,
- @JsonProperty("transformConfigs") @Nullable List<TransformConfig>
transformConfigs,
- @JsonProperty("complexTypeConfig") @Nullable ComplexTypeConfig
complexTypeConfig,
- @JsonProperty("aggregationConfigs") @Nullable List<AggregationConfig>
aggregationConfigs) {
+ @Deprecated
+ public IngestionConfig(@Nullable BatchIngestionConfig batchIngestionConfig,
+ @Nullable StreamIngestionConfig streamIngestionConfig, @Nullable
FilterConfig filterConfig,
+ @Nullable List<TransformConfig> transformConfigs, @Nullable
ComplexTypeConfig complexTypeConfig,
+ @Nullable List<AggregationConfig> aggregationConfigs) {
_batchIngestionConfig = batchIngestionConfig;
_streamIngestionConfig = streamIngestionConfig;
_filterConfig = filterConfig;
diff --git
a/pinot-spi/src/test/java/org/apache/pinot/spi/utils/IngestionConfigUtilsTest.java
b/pinot-spi/src/test/java/org/apache/pinot/spi/utils/IngestionConfigUtilsTest.java
index e9831ea622..f50f45746f 100644
---
a/pinot-spi/src/test/java/org/apache/pinot/spi/utils/IngestionConfigUtilsTest.java
+++
b/pinot-spi/src/test/java/org/apache/pinot/spi/utils/IngestionConfigUtilsTest.java
@@ -19,7 +19,8 @@
package org.apache.pinot.spi.utils;
import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Lists;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.pinot.spi.config.table.IndexingConfig;
@@ -53,11 +54,10 @@ public class IngestionConfigUtilsTest {
new
TableConfigBuilder(TableType.REALTIME).setTableName("myTable").setTimeColumnName("timeColumn").build();
// get from ingestion config (when not present in indexing config)
- Map<String, String> streamConfigMap = new HashMap<>();
- streamConfigMap.put("streamType", "kafka");
- tableConfig.setIngestionConfig(
- new IngestionConfig(null, new
StreamIngestionConfig(Lists.newArrayList(streamConfigMap)), null, null, null,
- null));
+ Map<String, String> streamConfigMap =
Collections.singletonMap("streamType", "kafka");
+ IngestionConfig ingestionConfig = new IngestionConfig();
+ ingestionConfig.setStreamIngestionConfig(new
StreamIngestionConfig(Collections.singletonList(streamConfigMap)));
+ tableConfig.setIngestionConfig(ingestionConfig);
Map<String, String> actualStreamConfigsMap =
IngestionConfigUtils.getStreamConfigMap(tableConfig);
Assert.assertEquals(actualStreamConfigsMap.size(), 1);
Assert.assertEquals(actualStreamConfigsMap.get("streamType"), "kafka");
@@ -74,9 +74,8 @@ public class IngestionConfigUtilsTest {
Assert.assertEquals(actualStreamConfigsMap.get("streamType"), "kafka");
// fail if multiple found
- tableConfig.setIngestionConfig(new IngestionConfig(null,
- new StreamIngestionConfig(Lists.newArrayList(streamConfigMap,
deprecatedStreamConfigMap)), null, null, null,
- null));
+ ingestionConfig.setStreamIngestionConfig(
+ new StreamIngestionConfig(Arrays.asList(streamConfigMap,
deprecatedStreamConfigMap)));
try {
IngestionConfigUtils.getStreamConfigMap(tableConfig);
Assert.fail("Should fail for multiple stream configs");
@@ -103,9 +102,10 @@ public class IngestionConfigUtilsTest {
@Test
public void testGetPushFrequency() {
// get from ingestion config, when not present in segmentsConfig
- TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName("myTable").build();
- tableConfig.setIngestionConfig(
- new IngestionConfig(new BatchIngestionConfig(null, "APPEND",
"HOURLY"), null, null, null, null, null));
+ IngestionConfig ingestionConfig = new IngestionConfig();
+ ingestionConfig.setBatchIngestionConfig(new BatchIngestionConfig(null,
"APPEND", "HOURLY"));
+ TableConfig tableConfig =
+ new
TableConfigBuilder(TableType.OFFLINE).setTableName("myTable").setIngestionConfig(ingestionConfig).build();
Assert.assertEquals(IngestionConfigUtils.getBatchSegmentIngestionFrequency(tableConfig),
"HOURLY");
// get from ingestion config, even if present in segmentsConfig
@@ -128,9 +128,10 @@ public class IngestionConfigUtilsTest {
@Test
public void testGetPushType() {
// get from ingestion config, when not present in segmentsConfig
- TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName("myTable").build();
- tableConfig.setIngestionConfig(
- new IngestionConfig(new BatchIngestionConfig(null, "APPEND",
"HOURLY"), null, null, null, null, null));
+ IngestionConfig ingestionConfig = new IngestionConfig();
+ ingestionConfig.setBatchIngestionConfig(new BatchIngestionConfig(null,
"APPEND", "HOURLY"));
+ TableConfig tableConfig =
+ new
TableConfigBuilder(TableType.OFFLINE).setTableName("myTable").setIngestionConfig(ingestionConfig).build();
Assert.assertEquals(IngestionConfigUtils.getBatchSegmentIngestionType(tableConfig),
"APPEND");
// get from ingestion config, even if present in segmentsConfig
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]