This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 6b1ccc5c70 Add Schema to the PinotTaskGenerator validateTaskConfigs
method call (#14683)
6b1ccc5c70 is described below
commit 6b1ccc5c70cbd8ab6533780b1171416e1ae30c1a
Author: Xiang Fu <[email protected]>
AuthorDate: Wed Jan 8 08:24:47 2025 -0800
Add Schema to the PinotTaskGenerator validateTaskConfigs method call
(#14683)
---
.../api/resources/PinotTableRestletResource.java | 12 +++++++-----
.../api/resources/TableConfigsRestletResource.java | 4 ++--
.../core/minion/generator/PinotTaskGenerator.java | 4 +++-
.../pinot/controller/util/TaskConfigUtils.java | 5 +++--
.../pinot/controller/util/TaskConfigUtilsTest.java | 11 ++++++-----
.../RealtimeToOfflineSegmentsTaskGenerator.java | 6 +++---
.../UpsertCompactionTaskGenerator.java | 3 ++-
.../UpsertCompactMergeTaskGenerator.java | 3 ++-
.../RealtimeToOfflineSegmentsTaskGeneratorTest.java | 20 ++++++++++----------
.../UpsertCompactionTaskGeneratorTest.java | 15 +++++++++------
.../UpsertCompactMergeTaskGeneratorTest.java | 19 +++++++++++--------
11 files changed, 58 insertions(+), 44 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
index 8c67df32b3..638849df46 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
@@ -211,6 +211,7 @@ public class PinotTableRestletResource {
Pair<TableConfig, Map<String, Object>>
tableConfigAndUnrecognizedProperties;
TableConfig tableConfig;
String tableNameWithType;
+ Schema schema;
try {
tableConfigAndUnrecognizedProperties =
JsonUtils.stringToObjectAndUnrecognizedProperties(tableConfigStr,
TableConfig.class);
@@ -224,7 +225,7 @@ public class PinotTableRestletResource {
ResourceUtils.checkPermissionAndAccess(tableNameWithType, request,
httpHeaders,
AccessType.CREATE, Actions.Table.CREATE_TABLE,
_accessControlFactory, LOGGER);
- Schema schema =
_pinotHelixResourceManager.getSchemaForTableConfig(tableConfig);
+ schema = _pinotHelixResourceManager.getSchemaForTableConfig(tableConfig);
TableConfigTunerUtils.applyTunerConfigs(_pinotHelixResourceManager,
tableConfig, schema, Collections.emptyMap());
@@ -239,7 +240,7 @@ public class PinotTableRestletResource {
TableConfigUtils.ensureMinReplicas(tableConfig,
_controllerConf.getDefaultTableMinReplicas());
TableConfigUtils.ensureStorageQuotaConstraints(tableConfig,
_controllerConf.getDimTableMaxSize());
checkHybridTableConfig(TableNameBuilder.extractRawTableName(tableNameWithType),
tableConfig);
- TaskConfigUtils.validateTaskConfigs(tableConfig, _pinotTaskManager,
typesToSkip);
+ TaskConfigUtils.validateTaskConfigs(tableConfig, schema,
_pinotTaskManager, typesToSkip);
} catch (Exception e) {
throw new InvalidTableConfigException(e);
}
@@ -481,6 +482,7 @@ public class PinotTableRestletResource {
Pair<TableConfig, Map<String, Object>>
tableConfigAndUnrecognizedProperties;
TableConfig tableConfig;
String tableNameWithType;
+ Schema schema;
try {
tableConfigAndUnrecognizedProperties =
JsonUtils.stringToObjectAndUnrecognizedProperties(tableConfigString,
TableConfig.class);
@@ -497,7 +499,7 @@ public class PinotTableRestletResource {
Response.Status.BAD_REQUEST);
}
- Schema schema =
_pinotHelixResourceManager.getSchemaForTableConfig(tableConfig);
+ schema = _pinotHelixResourceManager.getSchemaForTableConfig(tableConfig);
TableConfigUtils.validate(tableConfig, schema, typesToSkip);
} catch (Exception e) {
String msg = String.format("Invalid table config: %s with error: %s",
tableName, e.getMessage());
@@ -514,7 +516,7 @@ public class PinotTableRestletResource {
TableConfigUtils.ensureMinReplicas(tableConfig,
_controllerConf.getDefaultTableMinReplicas());
TableConfigUtils.ensureStorageQuotaConstraints(tableConfig,
_controllerConf.getDimTableMaxSize());
checkHybridTableConfig(TableNameBuilder.extractRawTableName(tableNameWithType),
tableConfig);
- TaskConfigUtils.validateTaskConfigs(tableConfig, _pinotTaskManager,
typesToSkip);
+ TaskConfigUtils.validateTaskConfigs(tableConfig, schema,
_pinotTaskManager, typesToSkip);
} catch (Exception e) {
throw new InvalidTableConfigException(e);
}
@@ -575,7 +577,7 @@ public class PinotTableRestletResource {
throw new SchemaNotFoundException("Got empty schema");
}
TableConfigUtils.validate(tableConfig, schema, typesToSkip);
- TaskConfigUtils.validateTaskConfigs(tableConfig, _pinotTaskManager,
typesToSkip);
+ TaskConfigUtils.validateTaskConfigs(tableConfig, schema,
_pinotTaskManager, typesToSkip);
ObjectNode tableConfigValidateStr = JsonUtils.newObjectNode();
if (tableConfig.getTableType() == TableType.OFFLINE) {
tableConfigValidateStr.set(TableType.OFFLINE.name(),
tableConfig.toJsonNode());
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TableConfigsRestletResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TableConfigsRestletResource.java
index 5d55df6095..82a9f164ea 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TableConfigsRestletResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/TableConfigsRestletResource.java
@@ -462,7 +462,7 @@ public class TableConfigsRestletResource {
"Name in 'offline' table config: %s must be equal to 'tableName':
%s", offlineRawTableName, rawTableName);
TableConfigUtils.validateTableName(offlineTableConfig);
TableConfigUtils.validate(offlineTableConfig, schema, typesToSkip);
- TaskConfigUtils.validateTaskConfigs(tableConfigs.getOffline(),
_pinotTaskManager, typesToSkip);
+ TaskConfigUtils.validateTaskConfigs(tableConfigs.getOffline(), schema,
_pinotTaskManager, typesToSkip);
}
if (realtimeTableConfig != null) {
String realtimeRawTableName = DatabaseUtils.translateTableName(
@@ -471,7 +471,7 @@ public class TableConfigsRestletResource {
"Name in 'realtime' table config: %s must be equal to 'tableName':
%s", realtimeRawTableName, rawTableName);
TableConfigUtils.validateTableName(realtimeTableConfig);
TableConfigUtils.validate(realtimeTableConfig, schema, typesToSkip);
- TaskConfigUtils.validateTaskConfigs(tableConfigs.getRealtime(),
_pinotTaskManager, typesToSkip);
+ TaskConfigUtils.validateTaskConfigs(tableConfigs.getRealtime(),
schema, _pinotTaskManager, typesToSkip);
}
if (offlineTableConfig != null && realtimeTableConfig != null) {
TableConfigUtils.verifyHybridTableConfigs(rawTableName,
offlineTableConfig, realtimeTableConfig);
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/PinotTaskGenerator.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/PinotTaskGenerator.java
index 9be76f253d..8d5d9bedcc 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/PinotTaskGenerator.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/PinotTaskGenerator.java
@@ -25,6 +25,7 @@ import
org.apache.pinot.controller.helix.core.minion.ClusterInfoAccessor;
import org.apache.pinot.core.common.MinionConstants;
import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.CommonConstants;
@@ -103,8 +104,9 @@ public interface PinotTaskGenerator {
/**
* Performs task type specific validations for the given task type.
* @param tableConfig The table configuration that is getting
added/updated/validated.
+ * @param schema The schema of the table.
* @param taskConfigs The task type specific task configuration to be
validated.
*/
- default void validateTaskConfigs(TableConfig tableConfig, Map<String,
String> taskConfigs) {
+ default void validateTaskConfigs(TableConfig tableConfig, Schema schema,
Map<String, String> taskConfigs) {
}
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/TaskConfigUtils.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/TaskConfigUtils.java
index 059908ea8d..53ba300934 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/TaskConfigUtils.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/TaskConfigUtils.java
@@ -26,6 +26,7 @@ import
org.apache.pinot.controller.helix.core.minion.generator.PinotTaskGenerato
import
org.apache.pinot.controller.helix.core.minion.generator.TaskGeneratorRegistry;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableTaskConfig;
+import org.apache.pinot.spi.data.Schema;
import org.quartz.CronScheduleBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,7 +41,7 @@ public class TaskConfigUtils {
private TaskConfigUtils() {
}
- public static void validateTaskConfigs(TableConfig tableConfig,
PinotTaskManager pinotTaskManager,
+ public static void validateTaskConfigs(TableConfig tableConfig, Schema
schema, PinotTaskManager pinotTaskManager,
String validationTypesToSkip) {
if (tableConfig == null || tableConfig.getTaskConfig() == null) {
return;
@@ -59,7 +60,7 @@ public class TaskConfigUtils {
if (taskGenerator != null) {
Map<String, String> taskConfigs = taskConfigEntry.getValue();
doCommonTaskValidations(tableConfig, taskType, taskConfigs);
- taskGenerator.validateTaskConfigs(tableConfig, taskConfigs);
+ taskGenerator.validateTaskConfigs(tableConfig, schema, taskConfigs);
} else {
throw new RuntimeException(String.format("Task generator not found
for task type: %s, while validating table "
+ "configs for table: %s", taskType,
tableConfig.getTableName()));
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/util/TaskConfigUtilsTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/util/TaskConfigUtilsTest.java
index 000bf9826c..6d4753fed8 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/util/TaskConfigUtilsTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/util/TaskConfigUtilsTest.java
@@ -30,6 +30,7 @@ import org.apache.pinot.core.minion.PinotTaskConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableTaskConfig;
import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.mockito.Mockito;
import org.testng.Assert;
@@ -64,7 +65,7 @@ public class TaskConfigUtilsTest {
}
@Override
- public void validateTaskConfigs(TableConfig tableConfig, Map<String,
String> taskConfigs) {
+ public void validateTaskConfigs(TableConfig tableConfig, Schema schema,
Map<String, String> taskConfigs) {
throw new RuntimeException("TableConfig validation failed");
}
};
@@ -73,22 +74,22 @@ public class TaskConfigUtilsTest {
when(_mockTaskManager.getTaskGeneratorRegistry()).thenReturn(_mockTaskRegistry);
}
- @Test (expectedExceptions = RuntimeException.class)
+ @Test(expectedExceptions = RuntimeException.class)
public void testValidateTableTaskConfigsValidationException() {
TableTaskConfig tableTaskConfig =
new TableTaskConfig(ImmutableMap.of(TEST_TASK_TYPE,
ImmutableMap.of("schedule", "0 */10 * ? * * *")));
TableConfig tableConfig =
new
TableConfigBuilder(TableType.OFFLINE).setTableName(TEST_TABLE_NAME).setTaskConfig(tableTaskConfig).build();
- TaskConfigUtils.validateTaskConfigs(tableConfig, _mockTaskManager, null);
+ TaskConfigUtils.validateTaskConfigs(tableConfig, new Schema(),
_mockTaskManager, null);
}
- @Test (expectedExceptions = RuntimeException.class)
+ @Test(expectedExceptions = RuntimeException.class)
public void testValidateTableTaskConfigsUnknownTaskType() {
TableTaskConfig tableTaskConfig =
new TableTaskConfig(ImmutableMap.of("otherTask",
ImmutableMap.of("schedule", "0 */10 * ? * * *")));
TableConfig tableConfig =
new
TableConfigBuilder(TableType.OFFLINE).setTableName(TEST_TABLE_NAME).setTaskConfig(tableTaskConfig).build();
- TaskConfigUtils.validateTaskConfigs(tableConfig, _mockTaskManager, null);
+ TaskConfigUtils.validateTaskConfigs(tableConfig, new Schema(),
_mockTaskManager, null);
}
@Test
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java
index 73ff19ebef..128610ae64 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java
@@ -321,7 +321,7 @@ public class RealtimeToOfflineSegmentsTaskGenerator extends
BaseTaskGenerator {
}
@Override
- public void validateTaskConfigs(TableConfig tableConfig, Map<String, String>
taskConfigs) {
+ public void validateTaskConfigs(TableConfig tableConfig, Schema schema,
Map<String, String> taskConfigs) {
// check table is not upsert
Preconditions.checkState(tableConfig.getUpsertMode() ==
UpsertConfig.Mode.NONE,
"RealtimeToOfflineTask doesn't support upsert table!");
@@ -336,8 +336,8 @@ public class RealtimeToOfflineSegmentsTaskGenerator extends
BaseTaskGenerator {
Preconditions.checkState(ImmutableSet.of(MergeType.CONCAT.name(),
MergeType.ROLLUP.name(), MergeType.DEDUP.name())
.contains(taskConfigs.getOrDefault(RealtimeToOfflineSegmentsTask.MERGE_TYPE_KEY,
MergeType.CONCAT.name())
.toUpperCase()), "MergeType must be one of [CONCAT, ROLLUP,
DEDUP]!");
-
- Schema schema =
_clusterInfoAccessor.getPinotHelixResourceManager().getSchemaForTableConfig(tableConfig);
+ // check schema is not null
+ Preconditions.checkNotNull(schema, "Schema should not be null!");
// check no mis-configured columns
Set<String> columnNames = schema.getColumnNames();
for (Map.Entry<String, String> entry : taskConfigs.entrySet()) {
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java
index 77aaefc069..6be851682b 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGenerator.java
@@ -45,6 +45,7 @@ import org.apache.pinot.spi.annotations.minion.TaskGenerator;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.UpsertConfig;
+import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.TimeUtils;
import org.slf4j.Logger;
@@ -289,7 +290,7 @@ public class UpsertCompactionTaskGenerator extends
BaseTaskGenerator {
}
@Override
- public void validateTaskConfigs(TableConfig tableConfig, Map<String, String>
taskConfigs) {
+ public void validateTaskConfigs(TableConfig tableConfig, Schema schema,
Map<String, String> taskConfigs) {
// check table is realtime
Preconditions.checkState(tableConfig.getTableType() == TableType.REALTIME,
"UpsertCompactionTask only supports realtime tables!");
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java
index b15bff8698..3c3df0bd4d 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGenerator.java
@@ -47,6 +47,7 @@ import org.apache.pinot.spi.annotations.minion.TaskGenerator;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.UpsertConfig;
+import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.DataSizeUtils;
import org.apache.pinot.spi.utils.TimeUtils;
import org.slf4j.Logger;
@@ -425,7 +426,7 @@ public class UpsertCompactMergeTaskGenerator extends
BaseTaskGenerator {
}
@Override
- public void validateTaskConfigs(TableConfig tableConfig, Map<String, String>
taskConfigs) {
+ public void validateTaskConfigs(TableConfig tableConfig, Schema schema,
Map<String, String> taskConfigs) {
// check table is realtime
Preconditions.checkState(tableConfig.getTableType() == TableType.REALTIME,
String.format("%s only supports realtime tables!",
MinionConstants.UpsertCompactMergeTask.TASK_TYPE));
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGeneratorTest.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGeneratorTest.java
index 49a9fd8d57..754f7224a2 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGeneratorTest.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGeneratorTest.java
@@ -541,7 +541,7 @@ public class RealtimeToOfflineSegmentsTaskGeneratorTest {
"SegmentGenerationAndPushTask",
segmentGenerationAndPushTaskConfig))).build();
// validate valid config
- taskGenerator.validateTaskConfigs(tableConfig,
realtimeToOfflineTaskConfig);
+ taskGenerator.validateTaskConfigs(tableConfig, schema,
realtimeToOfflineTaskConfig);
// invalid Upsert config with RealtimeToOfflineTask
tableConfig =
@@ -550,7 +550,7 @@ public class RealtimeToOfflineSegmentsTaskGeneratorTest {
ImmutableMap.of("RealtimeToOfflineSegmentsTask",
realtimeToOfflineTaskConfig,
"SegmentGenerationAndPushTask",
segmentGenerationAndPushTaskConfig))).build();
try {
- taskGenerator.validateTaskConfigs(tableConfig,
realtimeToOfflineTaskConfig);
+ taskGenerator.validateTaskConfigs(tableConfig, schema,
realtimeToOfflineTaskConfig);
Assert.fail();
} catch (IllegalStateException e) {
Assert.assertTrue(e.getMessage().contains("RealtimeToOfflineTask doesn't
support upsert table"));
@@ -564,7 +564,7 @@ public class RealtimeToOfflineSegmentsTaskGeneratorTest {
ImmutableMap.of("RealtimeToOfflineSegmentsTask",
invalidPeriodConfig, "SegmentGenerationAndPushTask",
segmentGenerationAndPushTaskConfig))).build();
try {
- taskGenerator.validateTaskConfigs(tableConfig, invalidPeriodConfig);
+ taskGenerator.validateTaskConfigs(tableConfig, schema,
invalidPeriodConfig);
Assert.fail();
} catch (IllegalArgumentException e) {
Assert.assertTrue(e.getMessage().contains("Invalid time spec"));
@@ -578,7 +578,7 @@ public class RealtimeToOfflineSegmentsTaskGeneratorTest {
ImmutableMap.of("RealtimeToOfflineSegmentsTask", invalidMergeType,
"SegmentGenerationAndPushTask",
segmentGenerationAndPushTaskConfig))).build();
try {
- taskGenerator.validateTaskConfigs(tableConfig, invalidMergeType);
+ taskGenerator.validateTaskConfigs(tableConfig, schema, invalidMergeType);
Assert.fail();
} catch (IllegalStateException e) {
Assert.assertTrue(e.getMessage().contains("MergeType must be one of"));
@@ -592,7 +592,7 @@ public class RealtimeToOfflineSegmentsTaskGeneratorTest {
ImmutableMap.of("RealtimeToOfflineSegmentsTask",
invalidColumnConfig, "SegmentGenerationAndPushTask",
segmentGenerationAndPushTaskConfig))).build();
try {
- taskGenerator.validateTaskConfigs(tableConfig, invalidColumnConfig);
+ taskGenerator.validateTaskConfigs(tableConfig, schema,
invalidColumnConfig);
Assert.fail();
} catch (IllegalStateException e) {
Assert.assertTrue(e.getMessage().contains("not found in schema"));
@@ -606,7 +606,7 @@ public class RealtimeToOfflineSegmentsTaskGeneratorTest {
ImmutableMap.of("RealtimeToOfflineSegmentsTask", invalidAggConfig,
"SegmentGenerationAndPushTask",
segmentGenerationAndPushTaskConfig))).build();
try {
- taskGenerator.validateTaskConfigs(tableConfig, invalidAggConfig);
+ taskGenerator.validateTaskConfigs(tableConfig, schema, invalidAggConfig);
Assert.fail();
} catch (IllegalStateException e) {
Assert.assertTrue(e.getMessage().contains("has invalid aggregate type"));
@@ -620,7 +620,7 @@ public class RealtimeToOfflineSegmentsTaskGeneratorTest {
ImmutableMap.of("RealtimeToOfflineSegmentsTask",
invalidAgg2Config, "SegmentGenerationAndPushTask",
segmentGenerationAndPushTaskConfig))).build();
try {
- taskGenerator.validateTaskConfigs(tableConfig, invalidAgg2Config);
+ taskGenerator.validateTaskConfigs(tableConfig, schema,
invalidAgg2Config);
Assert.fail();
} catch (IllegalStateException e) {
Assert.assertTrue(e.getMessage().contains("has invalid aggregate type"));
@@ -633,7 +633,7 @@ public class RealtimeToOfflineSegmentsTaskGeneratorTest {
new TableTaskConfig(
ImmutableMap.of("RealtimeToOfflineSegmentsTask", validAggConfig,
"SegmentGenerationAndPushTask",
segmentGenerationAndPushTaskConfig))).build();
- taskGenerator.validateTaskConfigs(tableConfig, validAggConfig);
+ taskGenerator.validateTaskConfigs(tableConfig, schema, validAggConfig);
// valid agg
HashMap<String, String> validAgg2Config = new
HashMap<>(realtimeToOfflineTaskConfig);
@@ -642,7 +642,7 @@ public class RealtimeToOfflineSegmentsTaskGeneratorTest {
new TableTaskConfig(
ImmutableMap.of("RealtimeToOfflineSegmentsTask", validAgg2Config,
"SegmentGenerationAndPushTask",
segmentGenerationAndPushTaskConfig))).build();
- taskGenerator.validateTaskConfigs(tableConfig, validAgg2Config);
+ taskGenerator.validateTaskConfigs(tableConfig, schema, validAgg2Config);
}
private SegmentZKMetadata getSegmentZKMetadata(String segmentName, Status
status, long startTime, long endTime,
@@ -659,7 +659,7 @@ public class RealtimeToOfflineSegmentsTaskGeneratorTest {
private IdealState getIdealState(String tableName, List<String>
segmentNames) {
IdealState idealState = new IdealState(tableName);
idealState.setRebalanceMode(IdealState.RebalanceMode.CUSTOMIZED);
- for (String segmentName: segmentNames) {
+ for (String segmentName : segmentNames) {
idealState.setPartitionState(segmentName, "Server_0", "ONLINE");
}
return idealState;
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java
index 1204c5ae5f..f4a31c180b 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompaction/UpsertCompactionTaskGeneratorTest.java
@@ -38,6 +38,7 @@ import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableTaskConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.UpsertConfig;
+import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.TimeUtils;
@@ -327,7 +328,7 @@ public class UpsertCompactionTaskGeneratorTest {
.setTaskConfig(new
TableTaskConfig(ImmutableMap.of("UpsertCompactionTask",
upsertCompactionTaskConfig)))
.build();
- _taskGenerator.validateTaskConfigs(tableConfig,
upsertCompactionTaskConfig);
+ _taskGenerator.validateTaskConfigs(tableConfig, new Schema(),
upsertCompactionTaskConfig);
// test with invalidRecordsThresholdPercents as 0
Map<String, String> upsertCompactionTaskConfig1 =
ImmutableMap.of("invalidRecordsThresholdPercent", "0");
@@ -335,7 +336,7 @@ public class UpsertCompactionTaskGeneratorTest {
.setUpsertConfig(upsertConfig)
.setTaskConfig(new
TableTaskConfig(ImmutableMap.of("UpsertCompactionTask",
upsertCompactionTaskConfig1)))
.build();
- _taskGenerator.validateTaskConfigs(zeroPercentTableConfig,
upsertCompactionTaskConfig1);
+ _taskGenerator.validateTaskConfigs(zeroPercentTableConfig, new Schema(),
upsertCompactionTaskConfig1);
// test with invalid invalidRecordsThresholdPercents as -1 and 110
Map<String, String> upsertCompactionTaskConfig2 =
ImmutableMap.of("invalidRecordsThresholdPercent", "-1");
@@ -344,14 +345,16 @@ public class UpsertCompactionTaskGeneratorTest {
.setTaskConfig(new
TableTaskConfig(ImmutableMap.of("UpsertCompactionTask",
upsertCompactionTaskConfig2)))
.build();
Assert.assertThrows(IllegalStateException.class,
- () -> _taskGenerator.validateTaskConfigs(negativePercentTableConfig,
upsertCompactionTaskConfig2));
+ () -> _taskGenerator.validateTaskConfigs(negativePercentTableConfig,
new Schema(),
+ upsertCompactionTaskConfig2));
Map<String, String> upsertCompactionTaskConfig3 =
ImmutableMap.of("invalidRecordsThresholdPercent", "110");
TableConfig hundredTenPercentTableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME)
.setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL))
.setTaskConfig(new
TableTaskConfig(ImmutableMap.of("UpsertCompactionTask",
upsertCompactionTaskConfig3)))
.build();
Assert.assertThrows(IllegalStateException.class,
- () -> _taskGenerator.validateTaskConfigs(hundredTenPercentTableConfig,
upsertCompactionTaskConfig3));
+ () -> _taskGenerator.validateTaskConfigs(hundredTenPercentTableConfig,
new Schema(),
+ upsertCompactionTaskConfig3));
// test with invalid invalidRecordsThresholdCount
Map<String, String> upsertCompactionTaskConfig4 =
ImmutableMap.of("invalidRecordsThresholdCount", "0");
@@ -360,7 +363,7 @@ public class UpsertCompactionTaskGeneratorTest {
.setTaskConfig(new
TableTaskConfig(ImmutableMap.of("UpsertCompactionTask",
upsertCompactionTaskConfig4)))
.build();
Assert.assertThrows(IllegalStateException.class,
- () -> _taskGenerator.validateTaskConfigs(invalidCountTableConfig,
upsertCompactionTaskConfig4));
+ () -> _taskGenerator.validateTaskConfigs(invalidCountTableConfig, new
Schema(), upsertCompactionTaskConfig4));
// test without invalidRecordsThresholdPercent or
invalidRecordsThresholdCount
Map<String, String> upsertCompactionTaskConfig5 =
ImmutableMap.of("bufferTimePeriod", "5d");
@@ -369,7 +372,7 @@ public class UpsertCompactionTaskGeneratorTest {
.setTaskConfig(new
TableTaskConfig(ImmutableMap.of("UpsertCompactionTask",
upsertCompactionTaskConfig5)))
.build();
Assert.assertThrows(IllegalStateException.class,
- () -> _taskGenerator.validateTaskConfigs(invalidTableConfig,
upsertCompactionTaskConfig5));
+ () -> _taskGenerator.validateTaskConfigs(invalidTableConfig, new
Schema(), upsertCompactionTaskConfig5));
}
private Map<String, String> getCompactionConfigs(String
invalidRecordsThresholdPercent,
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGeneratorTest.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGeneratorTest.java
index 3709392e76..7e4fbda5f5 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGeneratorTest.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/test/java/org/apache/pinot/plugin/minion/tasks/upsertcompactmerge/UpsertCompactMergeTaskGeneratorTest.java
@@ -33,6 +33,7 @@ import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableTaskConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.UpsertConfig;
+import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.TimeUtils;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
@@ -96,11 +97,11 @@ public class UpsertCompactMergeTaskGeneratorTest {
ImmutableMap.of("bufferTimePeriod", "5d");
TableConfig offlineTableConfig =
new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setTaskConfig(
- new
TableTaskConfig(ImmutableMap.of(MinionConstants.UpsertCompactMergeTask.TASK_TYPE,
- upsertCompactMergeTaskConfig)))
- .build();
+ new
TableTaskConfig(ImmutableMap.of(MinionConstants.UpsertCompactMergeTask.TASK_TYPE,
+ upsertCompactMergeTaskConfig)))
+ .build();
Assert.assertThrows(IllegalStateException.class,
- () -> _taskGenerator.validateTaskConfigs(offlineTableConfig,
upsertCompactMergeTaskConfig));
+ () -> _taskGenerator.validateTaskConfigs(offlineTableConfig, new
Schema(), upsertCompactMergeTaskConfig));
// check with non-upsert REALTIME table
TableConfig nonUpsertRealtimetableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME)
@@ -109,7 +110,8 @@ public class UpsertCompactMergeTaskGeneratorTest {
.build();
Assert.assertThrows(IllegalStateException.class,
- () -> _taskGenerator.validateTaskConfigs(nonUpsertRealtimetableConfig,
upsertCompactMergeTaskConfig));
+ () -> _taskGenerator.validateTaskConfigs(nonUpsertRealtimetableConfig,
new Schema(),
+ upsertCompactMergeTaskConfig));
// check with snapshot disabled
TableConfig disabledSnapshotTableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME)
@@ -118,7 +120,8 @@ public class UpsertCompactMergeTaskGeneratorTest {
upsertCompactMergeTaskConfig)))
.build();
Assert.assertThrows(IllegalStateException.class,
- () -> _taskGenerator.validateTaskConfigs(disabledSnapshotTableConfig,
upsertCompactMergeTaskConfig));
+ () -> _taskGenerator.validateTaskConfigs(disabledSnapshotTableConfig,
new Schema(),
+ upsertCompactMergeTaskConfig));
// valid table configs
UpsertConfig upsertConfig = new UpsertConfig(UpsertConfig.Mode.FULL);
@@ -128,13 +131,13 @@ public class UpsertCompactMergeTaskGeneratorTest {
.setTaskConfig(new
TableTaskConfig(ImmutableMap.of(MinionConstants.UpsertCompactMergeTask.TASK_TYPE,
upsertCompactMergeTaskConfig)))
.build();
- _taskGenerator.validateTaskConfigs(validTableConfig,
upsertCompactMergeTaskConfig);
+ _taskGenerator.validateTaskConfigs(validTableConfig, new Schema(),
upsertCompactMergeTaskConfig);
// invalid buffer time period
Map<String, String> upsertCompactMergeTaskConfig1 =
ImmutableMap.of("bufferTimePeriod", "5hd");
Assert.assertThrows(IllegalArgumentException.class,
- () -> _taskGenerator.validateTaskConfigs(validTableConfig,
upsertCompactMergeTaskConfig1));
+ () -> _taskGenerator.validateTaskConfigs(validTableConfig, new
Schema(), upsertCompactMergeTaskConfig1));
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]