This is an automated email from the ASF dual-hosted git repository.
nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new b9e8d96 Move ExpressionEvaluators and SchemaFieldExtractor from
pinot-spi to pinot-core (#5310)
b9e8d96 is described below
commit b9e8d96776caf57c812bb2de92e2c654dff7df30
Author: Neha Pawar <[email protected]>
AuthorDate: Mon Apr 27 16:17:41 2020 -0700
Move ExpressionEvaluators and SchemaFieldExtractor from pinot-spi to
pinot-core (#5310)
---
.../api/resources/PinotSchemaRestletResource.java | 9 +++---
.../data/function}/DefaultTimeSpecEvaluator.java | 2 +-
.../core/data/function}/ExpressionEvaluator.java | 2 +-
.../data/function}/ExpressionEvaluatorFactory.java | 13 ++++----
.../data/function}/GroovyExpressionEvaluator.java | 2 +-
.../realtime/HLRealtimeSegmentDataManager.java | 4 +--
.../realtime/LLRealtimeSegmentDataManager.java | 4 +--
.../manager/realtime/RealtimeTableDataManager.java | 3 +-
.../recordtransformer/ExpressionTransformer.java | 4 +--
.../impl/SegmentIndexCreationDriverImpl.java | 4 +--
.../org/apache/pinot/core/util/SchemaUtils.java | 37 ++++++++++++++--------
.../function}/GroovyExpressionEvaluatorTest.java | 2 +-
.../ExpressionTransformerTimeTest.java | 13 ++------
.../impl/fakestream/FakeStreamConsumerFactory.java | 4 +--
.../apache/pinot/core/util/SchemaUtilsTest.java | 34 ++++++++++----------
.../tests/MapTypeClusterIntegrationTest.java | 10 +++---
.../avro/AvroRecordToPinotRowGeneratorTest.java | 4 +--
.../java/org/apache/pinot/spi/data/Schema.java | 3 +-
.../tools/admin/command/CreateSegmentCommand.java | 4 +--
.../tools/config/validator/SchemaValidator.java | 3 +-
.../pinot/tools/streams/MeetupRsvpStream.java | 4 +--
21 files changed, 84 insertions(+), 81 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSchemaRestletResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSchemaRestletResource.java
index 362fb2e..12841d6 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSchemaRestletResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSchemaRestletResource.java
@@ -48,6 +48,7 @@ import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.controller.api.events.MetadataEventNotifierFactory;
import org.apache.pinot.controller.api.events.SchemaEventType;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.core.util.SchemaUtils;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.utils.JsonUtils;
@@ -169,7 +170,7 @@ public class PinotSchemaRestletResource {
@ApiResponses(value = {@ApiResponse(code = 200, message = "Successfully
validated schema"), @ApiResponse(code = 400, message = "Missing or invalid
request body"), @ApiResponse(code = 500, message = "Internal error")})
public String validateSchema(FormDataMultiPart multiPart) {
Schema schema = getSchemaFromMultiPart(multiPart);
- if (!schema.validate(LOGGER)) {
+ if (!SchemaUtils.validate(schema, LOGGER)) {
throw new ControllerApplicationException(LOGGER, "Invalid schema. Check
controller logs",
Response.Status.BAD_REQUEST);
}
@@ -184,7 +185,7 @@ public class PinotSchemaRestletResource {
+ "from 'GET /schema/{schemaName}'. This allows us to validate schema
before apply.")
@ApiResponses(value = {@ApiResponse(code = 200, message = "Successfully
validated schema"), @ApiResponse(code = 400, message = "Missing or invalid
request body"), @ApiResponse(code = 500, message = "Internal error")})
public String validateSchema(Schema schema) {
- if (!schema.validate(LOGGER)) {
+ if (!SchemaUtils.validate(schema, LOGGER)) {
throw new ControllerApplicationException(LOGGER, "Invalid schema. Check
controller logs",
Response.Status.BAD_REQUEST);
}
@@ -198,7 +199,7 @@ public class PinotSchemaRestletResource {
* @return
*/
private SuccessResponse addSchema(Schema schema, boolean override) {
- if (!schema.validate(LOGGER)) {
+ if (!SchemaUtils.validate(schema, LOGGER)) {
throw new ControllerApplicationException(LOGGER, "Cannot add invalid
schema " + schema.getSchemaName(),
Response.Status.BAD_REQUEST);
}
@@ -226,7 +227,7 @@ public class PinotSchemaRestletResource {
* @return
*/
private SuccessResponse updateSchema(String schemaName, Schema schema,
boolean reload) {
- if (!schema.validate(LOGGER)) {
+ if (!SchemaUtils.validate(schema, LOGGER)) {
throw new ControllerApplicationException(LOGGER, "Cannot add invalid
schema: " + schemaName,
Response.Status.BAD_REQUEST);
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/function/evaluators/DefaultTimeSpecEvaluator.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/function/DefaultTimeSpecEvaluator.java
similarity index 98%
rename from
pinot-spi/src/main/java/org/apache/pinot/spi/data/function/evaluators/DefaultTimeSpecEvaluator.java
rename to
pinot-core/src/main/java/org/apache/pinot/core/data/function/DefaultTimeSpecEvaluator.java
index dcb242d..a0466a0 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/function/evaluators/DefaultTimeSpecEvaluator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/function/DefaultTimeSpecEvaluator.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.spi.data.function.evaluators;
+package org.apache.pinot.core.data.function;
import com.google.common.base.Preconditions;
import java.util.Collections;
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/function/evaluators/ExpressionEvaluator.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/function/ExpressionEvaluator.java
similarity index 96%
rename from
pinot-spi/src/main/java/org/apache/pinot/spi/data/function/evaluators/ExpressionEvaluator.java
rename to
pinot-core/src/main/java/org/apache/pinot/core/data/function/ExpressionEvaluator.java
index e2bf8c4..dc51f0b 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/function/evaluators/ExpressionEvaluator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/function/ExpressionEvaluator.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.spi.data.function.evaluators;
+package org.apache.pinot.core.data.function;
import java.util.List;
import org.apache.pinot.spi.data.readers.GenericRow;
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/function/evaluators/ExpressionEvaluatorFactory.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/function/ExpressionEvaluatorFactory.java
similarity index 90%
rename from
pinot-spi/src/main/java/org/apache/pinot/spi/data/function/evaluators/ExpressionEvaluatorFactory.java
rename to
pinot-core/src/main/java/org/apache/pinot/core/data/function/ExpressionEvaluatorFactory.java
index db73f13..0959581 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/function/evaluators/ExpressionEvaluatorFactory.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/function/ExpressionEvaluatorFactory.java
@@ -16,13 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.spi.data.function.evaluators;
+package org.apache.pinot.core.data.function;
import javax.annotation.Nullable;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.TimeFieldSpec;
import org.apache.pinot.spi.data.TimeGranularitySpec;
-import org.apache.pinot.spi.utils.SchemaFieldExtractorUtils;
+import org.apache.pinot.core.util.SchemaUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -78,18 +78,17 @@ public class ExpressionEvaluatorFactory {
}
}
- } else if
(columnName.endsWith(SchemaFieldExtractorUtils.MAP_KEY_COLUMN_SUFFIX)) {
+ } else if (columnName.endsWith(SchemaUtils.MAP_KEY_COLUMN_SUFFIX)) {
// for backward compatible handling of Map type (currently only in Avro)
String sourceMapName =
- columnName.substring(0, columnName.length() -
SchemaFieldExtractorUtils.MAP_KEY_COLUMN_SUFFIX.length());
+ columnName.substring(0, columnName.length() -
SchemaUtils.MAP_KEY_COLUMN_SUFFIX.length());
String defaultMapKeysTransformExpression =
getDefaultMapKeysTransformExpression(sourceMapName);
expressionEvaluator =
getExpressionEvaluator(defaultMapKeysTransformExpression);
- } else if
(columnName.endsWith(SchemaFieldExtractorUtils.MAP_VALUE_COLUMN_SUFFIX)) {
-
+ } else if (columnName.endsWith(SchemaUtils.MAP_VALUE_COLUMN_SUFFIX)) {
// for backward compatible handling of Map type in avro (currently only
in Avro)
String sourceMapName =
- columnName.substring(0, columnName.length() -
SchemaFieldExtractorUtils.MAP_VALUE_COLUMN_SUFFIX.length());
+ columnName.substring(0, columnName.length() -
SchemaUtils.MAP_VALUE_COLUMN_SUFFIX.length());
String defaultMapValuesTransformExpression =
getDefaultMapValuesTransformExpression(sourceMapName);
expressionEvaluator =
getExpressionEvaluator(defaultMapValuesTransformExpression);
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/function/evaluators/GroovyExpressionEvaluator.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/function/GroovyExpressionEvaluator.java
similarity index 98%
rename from
pinot-spi/src/main/java/org/apache/pinot/spi/data/function/evaluators/GroovyExpressionEvaluator.java
rename to
pinot-core/src/main/java/org/apache/pinot/core/data/function/GroovyExpressionEvaluator.java
index 68ecfa6..b0362c9 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/function/evaluators/GroovyExpressionEvaluator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/function/GroovyExpressionEvaluator.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.spi.data.function.evaluators;
+package org.apache.pinot.core.data.function;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
index a2de83b..1edab27 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
@@ -51,7 +51,7 @@ import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamConsumerFactory;
import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
import org.apache.pinot.spi.stream.StreamLevelConsumer;
-import org.apache.pinot.spi.utils.SchemaFieldExtractorUtils;
+import org.apache.pinot.core.util.SchemaUtils;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.slf4j.Logger;
@@ -168,7 +168,7 @@ public class HLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
// create and init stream level consumer
_streamConsumerFactory =
StreamConsumerFactoryProvider.create(_streamConfig);
String clientId = HLRealtimeSegmentDataManager.class.getSimpleName() + "-"
+ _streamConfig.getTopicName();
- Set<String> sourceFields =
SchemaFieldExtractorUtils.extractSourceFields(schema);
+ Set<String> sourceFields = SchemaUtils.extractSourceFields(schema);
_streamLevelConsumer = _streamConsumerFactory
.createStreamLevelConsumer(clientId, tableNameWithType, schema,
instanceMetadata.getGroupId(tableNameWithType),
sourceFields);
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 628a4d7..35aec2f 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -76,7 +76,7 @@ import org.apache.pinot.spi.stream.StreamDecoderProvider;
import org.apache.pinot.spi.stream.StreamMessageDecoder;
import org.apache.pinot.spi.stream.StreamMetadataProvider;
import org.apache.pinot.spi.stream.TransientConsumerException;
-import org.apache.pinot.spi.utils.SchemaFieldExtractorUtils;
+import org.apache.pinot.core.util.SchemaUtils;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.slf4j.Logger;
@@ -1153,7 +1153,7 @@ public class LLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
.setConsumerDir(consumerDir);
// Create message decoder
- _sourceFields = SchemaFieldExtractorUtils.extractSourceFields(_schema);
+ _sourceFields = SchemaUtils.extractSourceFields(_schema);
_messageDecoder =
StreamDecoderProvider.create(_partitionLevelStreamConfig, _schema,
_sourceFields);
_clientId = _streamTopic + "-" + _streamPartitionId;
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 0143eed..697ce5a 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -51,6 +51,7 @@ import
org.apache.pinot.core.realtime.impl.RealtimeSegmentStatsHistory;
import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.core.segment.index.loader.LoaderUtils;
import
org.apache.pinot.core.segment.virtualcolumn.VirtualColumnProviderFactory;
+import org.apache.pinot.core.util.SchemaUtils;
import org.apache.pinot.spi.config.table.IndexingConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.FieldSpec;
@@ -353,7 +354,7 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
}
}
// 2. We want to get the schema errors, if any, even if isValid is false;
- if (!schema.validate(_logger)) {
+ if (!SchemaUtils.validate(schema, _logger)) {
isValid = false;
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/recordtransformer/ExpressionTransformer.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/recordtransformer/ExpressionTransformer.java
index 2619dfb..facdcf7 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/recordtransformer/ExpressionTransformer.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/recordtransformer/ExpressionTransformer.java
@@ -23,8 +23,8 @@ import java.util.Map;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
-import org.apache.pinot.spi.data.function.evaluators.ExpressionEvaluator;
-import
org.apache.pinot.spi.data.function.evaluators.ExpressionEvaluatorFactory;
+import org.apache.pinot.core.data.function.ExpressionEvaluator;
+import org.apache.pinot.core.data.function.ExpressionEvaluatorFactory;
/**
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java
b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java
index 2249271..5645c47 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java
@@ -57,7 +57,7 @@ import org.apache.pinot.spi.data.readers.FileFormat;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.RecordReader;
import org.apache.pinot.spi.data.readers.RecordReaderFactory;
-import org.apache.pinot.spi.utils.SchemaFieldExtractorUtils;
+import org.apache.pinot.core.util.SchemaUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -98,7 +98,7 @@ public class SegmentIndexCreationDriverImpl implements
SegmentIndexCreationDrive
Schema schema = segmentGeneratorConfig.getSchema();
FileFormat fileFormat = segmentGeneratorConfig.getFormat();
String recordReaderClassName =
segmentGeneratorConfig.getRecordReaderPath();
- Set<String> fields =
SchemaFieldExtractorUtils.extractSourceFields(segmentGeneratorConfig.getSchema());
+ Set<String> fields =
SchemaUtils.extractSourceFields(segmentGeneratorConfig.getSchema());
// Allow for instantiation general record readers from a record reader
path passed into segment generator config
// If this is set, this will override the file format
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/SchemaFieldExtractorUtils.java
b/pinot-core/src/main/java/org/apache/pinot/core/util/SchemaUtils.java
similarity index 80%
rename from
pinot-spi/src/main/java/org/apache/pinot/spi/utils/SchemaFieldExtractorUtils.java
rename to pinot-core/src/main/java/org/apache/pinot/core/util/SchemaUtils.java
index 0c34046..d152647 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/SchemaFieldExtractorUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/util/SchemaUtils.java
@@ -16,14 +16,13 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.spi.utils;
+package org.apache.pinot.core.util;
-import com.google.common.annotations.VisibleForTesting;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
-import org.apache.pinot.spi.data.function.evaluators.ExpressionEvaluator;
-import
org.apache.pinot.spi.data.function.evaluators.ExpressionEvaluatorFactory;
+import org.apache.pinot.core.data.function.ExpressionEvaluator;
+import org.apache.pinot.core.data.function.ExpressionEvaluatorFactory;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.TimeFieldSpec;
@@ -33,13 +32,14 @@ import org.slf4j.LoggerFactory;
/**
- * Extracts names of the source fields from the schema
+ * Schema utils
+ * FIXME: Merge this SchemaUtils with the SchemaUtils from pinot-common when
merging of modules happens
*/
-public class SchemaFieldExtractorUtils {
+public class SchemaUtils {
public static final String MAP_KEY_COLUMN_SUFFIX = "__KEYS";
public static final String MAP_VALUE_COLUMN_SUFFIX = "__VALUES";
- private static final Logger LOGGER =
LoggerFactory.getLogger(SchemaFieldExtractorUtils.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(SchemaUtils.class);
/**
* Extracts the source fields and destination fields from the schema
@@ -67,6 +67,16 @@ public class SchemaFieldExtractorUtils {
* i.e. do not allow using source column name for destination column
*/
public static boolean validate(Schema schema) {
+ return validate(schema, LOGGER);
+ }
+
+ /**
+ * Validates the following:
+ * 1) for a field spec with transform function, the source column name and
destination column name are exclusive
+ * i.e. do not allow using source column name for destination column
+ * 2) Basic schema validations
+ */
+ public static boolean validate(Schema schema, Logger logger) {
try {
for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
if (!fieldSpec.isVirtualColumn()) {
@@ -78,7 +88,7 @@ public class SchemaFieldExtractorUtils {
List<String> arguments = expressionEvaluator.getArguments();
// output column used as input
if (arguments.contains(column)) {
- LOGGER.error("The arguments of transform function: {}, should
not contain the destination column: {}",
+ logger.error("The arguments of transform function: {}, should
not contain the destination column: {}",
transformFunction, column);
return false;
}
@@ -91,13 +101,14 @@ public class SchemaFieldExtractorUtils {
if (!incomingGranularitySpec.equals(outgoingGranularitySpec)) {
// different incoming and outgoing spec, but same name
if
(incomingGranularitySpec.getName().equals(outgoingGranularitySpec.getName())) {
- LOGGER.error("Cannot convert from incoming field spec:{} to
outgoing field spec:{} if name is the same",
+ logger.error("Cannot convert from incoming field spec:{} to
outgoing field spec:{} if name is the same",
incomingGranularitySpec, outgoingGranularitySpec);
return false;
} else {
- if
(!incomingGranularitySpec.getTimeFormat().equals(TimeGranularitySpec.TimeFormat.EPOCH.toString())
|| !outgoingGranularitySpec.getTimeFormat()
+ if
(!incomingGranularitySpec.getTimeFormat().equals(TimeGranularitySpec.TimeFormat.EPOCH.toString())
+ || !outgoingGranularitySpec.getTimeFormat()
.equals(TimeGranularitySpec.TimeFormat.EPOCH.toString())) {
- LOGGER.error(
+ logger.error(
"When incoming and outgoing specs are different, cannot
perform time conversion for time format other than EPOCH");
return false;
}
@@ -107,9 +118,9 @@ public class SchemaFieldExtractorUtils {
}
}
} catch (Exception e) {
- LOGGER.error("Exception in validating schema {}",
schema.getSchemaName(), e);
+ logger.error("Exception in validating schema {}",
schema.getSchemaName(), e);
return false;
}
- return true;
+ return schema.validate(logger);
}
}
diff --git
a/pinot-spi/src/test/java/org/apache/pinot/spi/data/function/evaluators/GroovyExpressionEvaluatorTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/data/function/GroovyExpressionEvaluatorTest.java
similarity index 98%
rename from
pinot-spi/src/test/java/org/apache/pinot/spi/data/function/evaluators/GroovyExpressionEvaluatorTest.java
rename to
pinot-core/src/test/java/org/apache/pinot/core/data/function/GroovyExpressionEvaluatorTest.java
index 7d24725..0ff8fad 100644
---
a/pinot-spi/src/test/java/org/apache/pinot/spi/data/function/evaluators/GroovyExpressionEvaluatorTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/data/function/GroovyExpressionEvaluatorTest.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.spi.data.function.evaluators;
+package org.apache.pinot.core.data.function;
import java.util.ArrayList;
import java.util.HashMap;
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/data/recordtransformer/ExpressionTransformerTimeTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/data/recordtransformer/ExpressionTransformerTimeTest.java
index 2e1bbbf..aa86b69 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/data/recordtransformer/ExpressionTransformerTimeTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/data/recordtransformer/ExpressionTransformerTimeTest.java
@@ -82,9 +82,10 @@ public class ExpressionTransformerTimeTest {
// 3] both incoming and outgoing defined - same column name only - skip
conversion - this shouldn't be allowed by validation during add schema
try {
- new Schema.SchemaBuilder()
+ pinotSchema = new Schema.SchemaBuilder()
.addTime(new TimeGranularitySpec(FieldSpec.DataType.LONG,
TimeUnit.MILLISECONDS, "time"),
new TimeGranularitySpec(FieldSpec.DataType.LONG, TimeUnit.HOURS,
"time")).build();
+ new ExpressionTransformer(pinotSchema);
Assert.fail();
} catch (Exception e) {
// expected
@@ -168,15 +169,5 @@ public class ExpressionTransformerTimeTest {
genericRow.putValue("time", 20180101);
expressionTransformer.transform(genericRow);
Assert.assertEquals(genericRow.getValue("time"), 20180101);
-
- // When incoming and outgoing spec are not the same, simple date format is
not allowed
- try {
- new Schema.SchemaBuilder().addTime(new
TimeGranularitySpec(FieldSpec.DataType.INT, TimeUnit.DAYS,
- TimeGranularitySpec.TimeFormat.SIMPLE_DATE_FORMAT.toString(),
"incoming"),
- new TimeGranularitySpec(FieldSpec.DataType.LONG, TimeUnit.SECONDS,
"outgoing")).build();
- fail();
- } catch (Exception e) {
- // Expected
- }
}
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
index a573d58..40356c9 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
@@ -31,7 +31,7 @@ import org.apache.pinot.spi.stream.StreamDecoderProvider;
import org.apache.pinot.spi.stream.StreamLevelConsumer;
import org.apache.pinot.spi.stream.StreamMessageDecoder;
import org.apache.pinot.spi.stream.StreamMetadataProvider;
-import org.apache.pinot.spi.utils.SchemaFieldExtractorUtils;
+import org.apache.pinot.core.util.SchemaUtils;
/**
@@ -94,7 +94,7 @@ public class FakeStreamConsumerFactory extends
StreamConsumerFactory {
// Message decoder
Schema pinotSchema = FakeStreamConfigUtils.getPinotSchema();
StreamMessageDecoder streamMessageDecoder =
StreamDecoderProvider.create(streamConfig, pinotSchema,
- SchemaFieldExtractorUtils.extractSourceFields(pinotSchema));
+ SchemaUtils.extractSourceFields(pinotSchema));
GenericRow decodedRow = new GenericRow();
streamMessageDecoder.decode(messageBatch.getMessageAtIndex(0), decodedRow);
System.out.println(decodedRow);
diff --git
a/pinot-spi/src/test/java/org/apache/pinot/spi/utils/SchemaFieldExtractorUtilsTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/util/SchemaUtilsTest.java
similarity index 84%
rename from
pinot-spi/src/test/java/org/apache/pinot/spi/utils/SchemaFieldExtractorUtilsTest.java
rename to
pinot-core/src/test/java/org/apache/pinot/core/util/SchemaUtilsTest.java
index 7c1f038..43f1812 100644
---
a/pinot-spi/src/test/java/org/apache/pinot/spi/utils/SchemaFieldExtractorUtilsTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/util/SchemaUtilsTest.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pinot.spi.utils;
+package org.apache.pinot.core.util;
import java.util.ArrayList;
import java.util.Arrays;
@@ -35,7 +35,7 @@ import org.testng.annotations.Test;
/**
* Tests that the source field names are extracted correctly
*/
-public class SchemaFieldExtractorUtilsTest {
+public class SchemaUtilsTest {
@Test
public void testSourceFieldExtractorName() {
@@ -48,7 +48,7 @@ public class SchemaFieldExtractorUtilsTest {
dimensionFieldSpec.setTransformFunction("Groovy({function}, argument1,
argument2)");
schema.addField(dimensionFieldSpec);
- List<String> extract = new
ArrayList<>(SchemaFieldExtractorUtils.extractSourceFields(schema));
+ List<String> extract = new
ArrayList<>(SchemaUtils.extractSourceFields(schema));
Assert.assertEquals(extract.size(), 3);
Assert.assertTrue(extract.containsAll(Arrays.asList("d1", "argument1",
"argument2")));
@@ -58,7 +58,7 @@ public class SchemaFieldExtractorUtilsTest {
dimensionFieldSpec.setTransformFunction("Groovy({function})");
schema.addField(dimensionFieldSpec);
- extract = new
ArrayList<>(SchemaFieldExtractorUtils.extractSourceFields(schema));
+ extract = new ArrayList<>(SchemaUtils.extractSourceFields(schema));
Assert.assertEquals(extract.size(), 1);
Assert.assertTrue(extract.contains("d1"));
@@ -67,7 +67,7 @@ public class SchemaFieldExtractorUtilsTest {
dimensionFieldSpec = new DimensionFieldSpec("map__KEYS",
FieldSpec.DataType.INT, false);
schema.addField(dimensionFieldSpec);
- extract = new
ArrayList<>(SchemaFieldExtractorUtils.extractSourceFields(schema));
+ extract = new ArrayList<>(SchemaUtils.extractSourceFields(schema));
Assert.assertEquals(extract.size(), 2);
Assert.assertTrue(extract.containsAll(Arrays.asList("map", "map__KEYS")));
@@ -76,7 +76,7 @@ public class SchemaFieldExtractorUtilsTest {
dimensionFieldSpec = new DimensionFieldSpec("map__VALUES",
FieldSpec.DataType.LONG, false);
schema.addField(dimensionFieldSpec);
- extract = new
ArrayList<>(SchemaFieldExtractorUtils.extractSourceFields(schema));
+ extract = new ArrayList<>(SchemaUtils.extractSourceFields(schema));
Assert.assertEquals(extract.size(), 2);
Assert.assertTrue(extract.containsAll(Arrays.asList("map",
"map__VALUES")));
@@ -86,7 +86,7 @@ public class SchemaFieldExtractorUtilsTest {
TimeFieldSpec timeFieldSpec = new TimeFieldSpec("time",
FieldSpec.DataType.LONG, TimeUnit.MILLISECONDS);
schema.addField(timeFieldSpec);
- extract = new
ArrayList<>(SchemaFieldExtractorUtils.extractSourceFields(schema));
+ extract = new ArrayList<>(SchemaUtils.extractSourceFields(schema));
Assert.assertEquals(extract.size(), 1);
Assert.assertTrue(extract.contains("time"));
@@ -97,7 +97,7 @@ public class SchemaFieldExtractorUtilsTest {
TimeUnit.MILLISECONDS);
schema.addField(timeFieldSpec);
- extract = new
ArrayList<>(SchemaFieldExtractorUtils.extractSourceFields(schema));
+ extract = new ArrayList<>(SchemaUtils.extractSourceFields(schema));
Assert.assertEquals(extract.size(), 2);
Assert.assertTrue(extract.containsAll(Arrays.asList("in", "out")));
}
@@ -110,26 +110,26 @@ public class SchemaFieldExtractorUtilsTest {
DimensionFieldSpec dimensionFieldSpec = new DimensionFieldSpec("dim1",
FieldSpec.DataType.STRING, true);
dimensionFieldSpec.setTransformFunction("Groovy({function}, argument1,
dim1, argument3)");
pinotSchema.addField(dimensionFieldSpec);
- Assert.assertFalse(SchemaFieldExtractorUtils.validate(pinotSchema));
+ Assert.assertFalse(SchemaUtils.validate(pinotSchema));
pinotSchema = new Schema();
MetricFieldSpec metricFieldSpec = new MetricFieldSpec("m1",
FieldSpec.DataType.LONG);
metricFieldSpec.setTransformFunction("Groovy({function}, m1, m1)");
pinotSchema.addField(metricFieldSpec);
- Assert.assertFalse(SchemaFieldExtractorUtils.validate(pinotSchema));
+ Assert.assertFalse(SchemaUtils.validate(pinotSchema));
pinotSchema = new Schema();
TimeFieldSpec timeFieldSpec = new TimeFieldSpec("time",
FieldSpec.DataType.LONG, TimeUnit.MILLISECONDS);
timeFieldSpec.setTransformFunction("Groovy({function}, time)");
pinotSchema.addField(timeFieldSpec);
- Assert.assertFalse(SchemaFieldExtractorUtils.validate(pinotSchema));
+ Assert.assertFalse(SchemaUtils.validate(pinotSchema));
// time field spec using same name for incoming and outgoing
pinotSchema = new Schema();
timeFieldSpec = new TimeFieldSpec(new
TimeGranularitySpec(FieldSpec.DataType.LONG, TimeUnit.MILLISECONDS, "time"),
new TimeGranularitySpec(FieldSpec.DataType.INT, TimeUnit.DAYS,
"time"));
pinotSchema.addField(timeFieldSpec);
- Assert.assertFalse(SchemaFieldExtractorUtils.validate(pinotSchema));
+ Assert.assertFalse(SchemaUtils.validate(pinotSchema));
// time field spec using SIMPLE_DATE_FORMAT, not allowed when conversion
is needed
pinotSchema = new Schema();
@@ -138,21 +138,21 @@ public class SchemaFieldExtractorUtilsTest {
new TimeGranularitySpec(FieldSpec.DataType.INT, TimeUnit.DAYS,
TimeGranularitySpec.TimeFormat.SIMPLE_DATE_FORMAT.toString(),
"outgoing"));
pinotSchema.addField(timeFieldSpec);
- Assert.assertFalse(SchemaFieldExtractorUtils.validate(pinotSchema));
+ Assert.assertFalse(SchemaUtils.validate(pinotSchema));
// incorrect groovy function syntax
pinotSchema = new Schema();
dimensionFieldSpec = new DimensionFieldSpec("dim1",
FieldSpec.DataType.STRING, true);
dimensionFieldSpec.setTransformFunction("Groovy(function, argument3)");
pinotSchema.addField(dimensionFieldSpec);
- Assert.assertFalse(SchemaFieldExtractorUtils.validate(pinotSchema));
+ Assert.assertFalse(SchemaUtils.validate(pinotSchema));
// valid schema, empty arguments
pinotSchema = new Schema();
dimensionFieldSpec = new DimensionFieldSpec("dim1",
FieldSpec.DataType.STRING, true);
dimensionFieldSpec.setTransformFunction("Groovy({function})");
pinotSchema.addField(dimensionFieldSpec);
- Assert.assertTrue(SchemaFieldExtractorUtils.validate(pinotSchema));
+ Assert.assertTrue(SchemaUtils.validate(pinotSchema));
// valid schema
pinotSchema = new Schema();
@@ -165,7 +165,7 @@ public class SchemaFieldExtractorUtilsTest {
timeFieldSpec = new TimeFieldSpec("time", FieldSpec.DataType.LONG,
TimeUnit.MILLISECONDS);
timeFieldSpec.setTransformFunction("Groovy({function}, millis)");
pinotSchema.addField(timeFieldSpec);
- Assert.assertTrue(SchemaFieldExtractorUtils.validate(pinotSchema));
+ Assert.assertTrue(SchemaUtils.validate(pinotSchema));
// valid time field spec
pinotSchema = new Schema();
@@ -173,6 +173,6 @@ public class SchemaFieldExtractorUtilsTest {
new TimeFieldSpec(new TimeGranularitySpec(FieldSpec.DataType.LONG,
TimeUnit.MILLISECONDS, "incoming"),
new TimeGranularitySpec(FieldSpec.DataType.INT, TimeUnit.DAYS,
"outgoing"));
pinotSchema.addField(timeFieldSpec);
- Assert.assertTrue(SchemaFieldExtractorUtils.validate(pinotSchema));
+ Assert.assertTrue(SchemaUtils.validate(pinotSchema));
}
}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MapTypeClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MapTypeClusterIntegrationTest.java
index 0c7a51d..031f379 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MapTypeClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MapTypeClusterIntegrationTest.java
@@ -36,7 +36,7 @@ import org.apache.avro.generic.GenericDatumWriter;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.utils.SchemaFieldExtractorUtils;
+import org.apache.pinot.core.util.SchemaUtils;
import org.apache.pinot.util.TestUtils;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -73,10 +73,10 @@ public class MapTypeClusterIntegrationTest extends
BaseClusterIntegrationTest {
// Create and upload segments
File avroFile = createAvroFile();
Schema schema = new Schema.SchemaBuilder().setSchemaName(getTableName())
- .addMultiValueDimension(STRING_KEY_MAP_FIELD_NAME +
SchemaFieldExtractorUtils.MAP_KEY_COLUMN_SUFFIX, DataType.STRING)
- .addMultiValueDimension(STRING_KEY_MAP_FIELD_NAME +
SchemaFieldExtractorUtils.MAP_VALUE_COLUMN_SUFFIX, DataType.INT)
- .addMultiValueDimension(INT_KEY_MAP_FIELD_NAME +
SchemaFieldExtractorUtils.MAP_KEY_COLUMN_SUFFIX, DataType.INT)
- .addMultiValueDimension(INT_KEY_MAP_FIELD_NAME +
SchemaFieldExtractorUtils.MAP_VALUE_COLUMN_SUFFIX, DataType.INT).build();
+ .addMultiValueDimension(STRING_KEY_MAP_FIELD_NAME +
SchemaUtils.MAP_KEY_COLUMN_SUFFIX, DataType.STRING)
+ .addMultiValueDimension(STRING_KEY_MAP_FIELD_NAME +
SchemaUtils.MAP_VALUE_COLUMN_SUFFIX, DataType.INT)
+ .addMultiValueDimension(INT_KEY_MAP_FIELD_NAME +
SchemaUtils.MAP_KEY_COLUMN_SUFFIX, DataType.INT)
+ .addMultiValueDimension(INT_KEY_MAP_FIELD_NAME +
SchemaUtils.MAP_VALUE_COLUMN_SUFFIX, DataType.INT).build();
ExecutorService executor = Executors.newCachedThreadPool();
ClusterIntegrationTestUtils
.buildSegmentsFromAvro(Collections.singletonList(avroFile), 0,
_segmentDir, _tarDir, getTableName(), null, null,
diff --git
a/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordToPinotRowGeneratorTest.java
b/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordToPinotRowGeneratorTest.java
index 8a43f61..fc9029c 100644
---
a/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordToPinotRowGeneratorTest.java
+++
b/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordToPinotRowGeneratorTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.plugin.inputformat.avro;
+import com.google.common.collect.Sets;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
@@ -27,7 +28,6 @@ import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.readers.GenericRow;
-import org.apache.pinot.spi.utils.SchemaFieldExtractorUtils;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -47,7 +47,7 @@ public class AvroRecordToPinotRowGeneratorTest {
new
org.apache.pinot.spi.data.Schema.SchemaBuilder().setSchemaName("testSchema")
.addTime("incomingTime", TimeUnit.MILLISECONDS,
FieldSpec.DataType.LONG, "outgoingTime", TimeUnit.DAYS,
FieldSpec.DataType.INT).build();
- Set<String> sourceFields =
SchemaFieldExtractorUtils.extractSourceFields(pinotSchema);
+ Set<String> sourceFields = Sets.newHashSet("incomingTime", "outgoingTime");
AvroRecordExtractor avroRecordExtractor = new AvroRecordExtractor();
avroRecordExtractor.init(sourceFields, null);
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java
index 457d762..bc16ea7 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java
@@ -39,7 +39,6 @@ import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.data.FieldSpec.FieldType;
import org.apache.pinot.spi.utils.EqualityUtils;
import org.apache.pinot.spi.utils.JsonUtils;
-import org.apache.pinot.spi.utils.SchemaFieldExtractorUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -436,7 +435,7 @@ public final class Schema {
}
}
- return SchemaFieldExtractorUtils.validate(this);
+ return true;
}
public static class SchemaBuilder {
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/CreateSegmentCommand.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/CreateSegmentCommand.java
index 680b808..f9034e8 100644
---
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/CreateSegmentCommand.java
+++
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/CreateSegmentCommand.java
@@ -40,7 +40,7 @@ import org.apache.pinot.spi.data.readers.RecordReader;
import org.apache.pinot.spi.filesystem.PinotFS;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
import org.apache.pinot.spi.utils.JsonUtils;
-import org.apache.pinot.spi.utils.SchemaFieldExtractorUtils;
+import org.apache.pinot.core.util.SchemaUtils;
import org.apache.pinot.tools.Command;
import org.kohsuke.args4j.Option;
import org.slf4j.Logger;
@@ -365,7 +365,7 @@ public class CreateSegmentCommand extends
AbstractBaseAdminCommand implements Co
if (_readerConfigFile != null) {
readerConfig = JsonUtils.fileToObject(new
File(_readerConfigFile), CSVRecordReaderConfig.class);
}
- csvRecordReader.init(localFile, schema, readerConfig,
SchemaFieldExtractorUtils.extractSourceFields(schema));
+ csvRecordReader.init(localFile, schema, readerConfig,
SchemaUtils.extractSourceFields(schema));
driver.init(config, csvRecordReader);
break;
default:
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/config/validator/SchemaValidator.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/config/validator/SchemaValidator.java
index 6048774..610c1c5 100644
---
a/pinot-tools/src/main/java/org/apache/pinot/tools/config/validator/SchemaValidator.java
+++
b/pinot-tools/src/main/java/org/apache/pinot/tools/config/validator/SchemaValidator.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.tools.config.validator;
+import org.apache.pinot.core.util.SchemaUtils;
import org.apache.pinot.spi.data.Schema;
@@ -27,6 +28,6 @@ public class SchemaValidator {
public static boolean validate(Schema schema) {
// TODO: ADD MORE VALIDATIONS.
- return schema.validate(null);
+ return SchemaUtils.validate(schema);
}
}
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java
index f78ed77..620748c 100644
---
a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java
+++
b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java
@@ -36,7 +36,7 @@ import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.stream.StreamDataProducer;
import org.apache.pinot.spi.stream.StreamDataProvider;
import org.apache.pinot.spi.stream.StreamMessageDecoder;
-import org.apache.pinot.spi.utils.SchemaFieldExtractorUtils;
+import org.apache.pinot.core.util.SchemaUtils;
import org.apache.pinot.tools.utils.KafkaStarterUtils;
import org.glassfish.tyrus.client.ClientManager;
@@ -68,7 +68,7 @@ public class MeetupRsvpStream {
try {
final ClientEndpointConfig cec =
ClientEndpointConfig.Builder.create().build();
final StreamMessageDecoder decoder =
PluginManager.get().createInstance(KafkaStarterUtils.KAFKA_JSON_MESSAGE_DECODER_CLASS_NAME);
- decoder.init(null, schema, null,
SchemaFieldExtractorUtils.extractSourceFields(schema));
+ decoder.init(null, schema, null,
SchemaUtils.extractSourceFields(schema));
client = ClientManager.createClient();
client.connectToServer(new Endpoint() {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]