This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new b9e8d96  Move ExpressionEvaluators and SchemaFieldExtractor from 
pinot-spi to pinot-core (#5310)
b9e8d96 is described below

commit b9e8d96776caf57c812bb2de92e2c654dff7df30
Author: Neha Pawar <[email protected]>
AuthorDate: Mon Apr 27 16:17:41 2020 -0700

    Move ExpressionEvaluators and SchemaFieldExtractor from pinot-spi to 
pinot-core (#5310)
---
 .../api/resources/PinotSchemaRestletResource.java  |  9 +++---
 .../data/function}/DefaultTimeSpecEvaluator.java   |  2 +-
 .../core/data/function}/ExpressionEvaluator.java   |  2 +-
 .../data/function}/ExpressionEvaluatorFactory.java | 13 ++++----
 .../data/function}/GroovyExpressionEvaluator.java  |  2 +-
 .../realtime/HLRealtimeSegmentDataManager.java     |  4 +--
 .../realtime/LLRealtimeSegmentDataManager.java     |  4 +--
 .../manager/realtime/RealtimeTableDataManager.java |  3 +-
 .../recordtransformer/ExpressionTransformer.java   |  4 +--
 .../impl/SegmentIndexCreationDriverImpl.java       |  4 +--
 .../org/apache/pinot/core/util/SchemaUtils.java    | 37 ++++++++++++++--------
 .../function}/GroovyExpressionEvaluatorTest.java   |  2 +-
 .../ExpressionTransformerTimeTest.java             | 13 ++------
 .../impl/fakestream/FakeStreamConsumerFactory.java |  4 +--
 .../apache/pinot/core/util/SchemaUtilsTest.java    | 34 ++++++++++----------
 .../tests/MapTypeClusterIntegrationTest.java       | 10 +++---
 .../avro/AvroRecordToPinotRowGeneratorTest.java    |  4 +--
 .../java/org/apache/pinot/spi/data/Schema.java     |  3 +-
 .../tools/admin/command/CreateSegmentCommand.java  |  4 +--
 .../tools/config/validator/SchemaValidator.java    |  3 +-
 .../pinot/tools/streams/MeetupRsvpStream.java      |  4 +--
 21 files changed, 84 insertions(+), 81 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSchemaRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSchemaRestletResource.java
index 362fb2e..12841d6 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSchemaRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSchemaRestletResource.java
@@ -48,6 +48,7 @@ import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.controller.api.events.MetadataEventNotifierFactory;
 import org.apache.pinot.controller.api.events.SchemaEventType;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.core.util.SchemaUtils;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.utils.JsonUtils;
@@ -169,7 +170,7 @@ public class PinotSchemaRestletResource {
   @ApiResponses(value = {@ApiResponse(code = 200, message = "Successfully 
validated schema"), @ApiResponse(code = 400, message = "Missing or invalid 
request body"), @ApiResponse(code = 500, message = "Internal error")})
   public String validateSchema(FormDataMultiPart multiPart) {
     Schema schema = getSchemaFromMultiPart(multiPart);
-    if (!schema.validate(LOGGER)) {
+    if (!SchemaUtils.validate(schema, LOGGER)) {
       throw new ControllerApplicationException(LOGGER, "Invalid schema. Check 
controller logs",
           Response.Status.BAD_REQUEST);
     }
@@ -184,7 +185,7 @@ public class PinotSchemaRestletResource {
       + "from 'GET /schema/{schemaName}'. This allows us to validate schema 
before apply.")
   @ApiResponses(value = {@ApiResponse(code = 200, message = "Successfully 
validated schema"), @ApiResponse(code = 400, message = "Missing or invalid 
request body"), @ApiResponse(code = 500, message = "Internal error")})
   public String validateSchema(Schema schema) {
-    if (!schema.validate(LOGGER)) {
+    if (!SchemaUtils.validate(schema, LOGGER)) {
       throw new ControllerApplicationException(LOGGER, "Invalid schema. Check 
controller logs",
           Response.Status.BAD_REQUEST);
     }
@@ -198,7 +199,7 @@ public class PinotSchemaRestletResource {
    * @return
    */
   private SuccessResponse addSchema(Schema schema, boolean override) {
-    if (!schema.validate(LOGGER)) {
+    if (!SchemaUtils.validate(schema, LOGGER)) {
       throw new ControllerApplicationException(LOGGER, "Cannot add invalid 
schema " + schema.getSchemaName(),
           Response.Status.BAD_REQUEST);
     }
@@ -226,7 +227,7 @@ public class PinotSchemaRestletResource {
    * @return
    */
   private SuccessResponse updateSchema(String schemaName, Schema schema, 
boolean reload) {
-    if (!schema.validate(LOGGER)) {
+    if (!SchemaUtils.validate(schema, LOGGER)) {
       throw new ControllerApplicationException(LOGGER, "Cannot add invalid 
schema: " + schemaName,
           Response.Status.BAD_REQUEST);
     }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/function/evaluators/DefaultTimeSpecEvaluator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/function/DefaultTimeSpecEvaluator.java
similarity index 98%
rename from 
pinot-spi/src/main/java/org/apache/pinot/spi/data/function/evaluators/DefaultTimeSpecEvaluator.java
rename to 
pinot-core/src/main/java/org/apache/pinot/core/data/function/DefaultTimeSpecEvaluator.java
index dcb242d..a0466a0 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/function/evaluators/DefaultTimeSpecEvaluator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/function/DefaultTimeSpecEvaluator.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.spi.data.function.evaluators;
+package org.apache.pinot.core.data.function;
 
 import com.google.common.base.Preconditions;
 import java.util.Collections;
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/function/evaluators/ExpressionEvaluator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/function/ExpressionEvaluator.java
similarity index 96%
rename from 
pinot-spi/src/main/java/org/apache/pinot/spi/data/function/evaluators/ExpressionEvaluator.java
rename to 
pinot-core/src/main/java/org/apache/pinot/core/data/function/ExpressionEvaluator.java
index e2bf8c4..dc51f0b 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/function/evaluators/ExpressionEvaluator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/function/ExpressionEvaluator.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.spi.data.function.evaluators;
+package org.apache.pinot.core.data.function;
 
 import java.util.List;
 import org.apache.pinot.spi.data.readers.GenericRow;
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/function/evaluators/ExpressionEvaluatorFactory.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/function/ExpressionEvaluatorFactory.java
similarity index 90%
rename from 
pinot-spi/src/main/java/org/apache/pinot/spi/data/function/evaluators/ExpressionEvaluatorFactory.java
rename to 
pinot-core/src/main/java/org/apache/pinot/core/data/function/ExpressionEvaluatorFactory.java
index db73f13..0959581 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/function/evaluators/ExpressionEvaluatorFactory.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/function/ExpressionEvaluatorFactory.java
@@ -16,13 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.spi.data.function.evaluators;
+package org.apache.pinot.core.data.function;
 
 import javax.annotation.Nullable;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.TimeFieldSpec;
 import org.apache.pinot.spi.data.TimeGranularitySpec;
-import org.apache.pinot.spi.utils.SchemaFieldExtractorUtils;
+import org.apache.pinot.core.util.SchemaUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -78,18 +78,17 @@ public class ExpressionEvaluatorFactory {
         }
       }
 
-    } else if 
(columnName.endsWith(SchemaFieldExtractorUtils.MAP_KEY_COLUMN_SUFFIX)) {
+    } else if (columnName.endsWith(SchemaUtils.MAP_KEY_COLUMN_SUFFIX)) {
 
       // for backward compatible handling of Map type (currently only in Avro)
       String sourceMapName =
-          columnName.substring(0, columnName.length() - 
SchemaFieldExtractorUtils.MAP_KEY_COLUMN_SUFFIX.length());
+          columnName.substring(0, columnName.length() - 
SchemaUtils.MAP_KEY_COLUMN_SUFFIX.length());
       String defaultMapKeysTransformExpression = 
getDefaultMapKeysTransformExpression(sourceMapName);
       expressionEvaluator = 
getExpressionEvaluator(defaultMapKeysTransformExpression);
-    } else if 
(columnName.endsWith(SchemaFieldExtractorUtils.MAP_VALUE_COLUMN_SUFFIX)) {
-
+    } else if (columnName.endsWith(SchemaUtils.MAP_VALUE_COLUMN_SUFFIX)) {
       // for backward compatible handling of Map type in avro (currently only 
in Avro)
       String sourceMapName =
-          columnName.substring(0, columnName.length() - 
SchemaFieldExtractorUtils.MAP_VALUE_COLUMN_SUFFIX.length());
+          columnName.substring(0, columnName.length() - 
SchemaUtils.MAP_VALUE_COLUMN_SUFFIX.length());
       String defaultMapValuesTransformExpression = 
getDefaultMapValuesTransformExpression(sourceMapName);
       expressionEvaluator = 
getExpressionEvaluator(defaultMapValuesTransformExpression);
     }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/function/evaluators/GroovyExpressionEvaluator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/function/GroovyExpressionEvaluator.java
similarity index 98%
rename from 
pinot-spi/src/main/java/org/apache/pinot/spi/data/function/evaluators/GroovyExpressionEvaluator.java
rename to 
pinot-core/src/main/java/org/apache/pinot/core/data/function/GroovyExpressionEvaluator.java
index 68ecfa6..b0362c9 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/function/evaluators/GroovyExpressionEvaluator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/function/GroovyExpressionEvaluator.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.spi.data.function.evaluators;
+package org.apache.pinot.core.data.function;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Splitter;
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
index a2de83b..1edab27 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
@@ -51,7 +51,7 @@ import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.pinot.spi.stream.StreamConsumerFactory;
 import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
 import org.apache.pinot.spi.stream.StreamLevelConsumer;
-import org.apache.pinot.spi.utils.SchemaFieldExtractorUtils;
+import org.apache.pinot.core.util.SchemaUtils;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
 import org.slf4j.Logger;
@@ -168,7 +168,7 @@ public class HLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
     // create and init stream level consumer
     _streamConsumerFactory = 
StreamConsumerFactoryProvider.create(_streamConfig);
     String clientId = HLRealtimeSegmentDataManager.class.getSimpleName() + "-" 
+ _streamConfig.getTopicName();
-    Set<String> sourceFields = 
SchemaFieldExtractorUtils.extractSourceFields(schema);
+    Set<String> sourceFields = SchemaUtils.extractSourceFields(schema);
     _streamLevelConsumer = _streamConsumerFactory
         .createStreamLevelConsumer(clientId, tableNameWithType, schema, 
instanceMetadata.getGroupId(tableNameWithType),
             sourceFields);
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 628a4d7..35aec2f 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -76,7 +76,7 @@ import org.apache.pinot.spi.stream.StreamDecoderProvider;
 import org.apache.pinot.spi.stream.StreamMessageDecoder;
 import org.apache.pinot.spi.stream.StreamMetadataProvider;
 import org.apache.pinot.spi.stream.TransientConsumerException;
-import org.apache.pinot.spi.utils.SchemaFieldExtractorUtils;
+import org.apache.pinot.core.util.SchemaUtils;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
 import org.slf4j.Logger;
@@ -1153,7 +1153,7 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
             .setConsumerDir(consumerDir);
 
     // Create message decoder
-    _sourceFields = SchemaFieldExtractorUtils.extractSourceFields(_schema);
+    _sourceFields = SchemaUtils.extractSourceFields(_schema);
     _messageDecoder = 
StreamDecoderProvider.create(_partitionLevelStreamConfig, _schema, 
_sourceFields);
     _clientId = _streamTopic + "-" + _streamPartitionId;
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 0143eed..697ce5a 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -51,6 +51,7 @@ import 
org.apache.pinot.core.realtime.impl.RealtimeSegmentStatsHistory;
 import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig;
 import org.apache.pinot.core.segment.index.loader.LoaderUtils;
 import 
org.apache.pinot.core.segment.virtualcolumn.VirtualColumnProviderFactory;
+import org.apache.pinot.core.util.SchemaUtils;
 import org.apache.pinot.spi.config.table.IndexingConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.FieldSpec;
@@ -353,7 +354,7 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
       }
     }
     // 2. We want to get the schema errors, if any, even if isValid is false;
-    if (!schema.validate(_logger)) {
+    if (!SchemaUtils.validate(schema, _logger)) {
       isValid = false;
     }
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/recordtransformer/ExpressionTransformer.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/recordtransformer/ExpressionTransformer.java
index 2619dfb..facdcf7 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/recordtransformer/ExpressionTransformer.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/recordtransformer/ExpressionTransformer.java
@@ -23,8 +23,8 @@ import java.util.Map;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.GenericRow;
-import org.apache.pinot.spi.data.function.evaluators.ExpressionEvaluator;
-import 
org.apache.pinot.spi.data.function.evaluators.ExpressionEvaluatorFactory;
+import org.apache.pinot.core.data.function.ExpressionEvaluator;
+import org.apache.pinot.core.data.function.ExpressionEvaluatorFactory;
 
 
 /**
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java
index 2249271..5645c47 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java
@@ -57,7 +57,7 @@ import org.apache.pinot.spi.data.readers.FileFormat;
 import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.data.readers.RecordReader;
 import org.apache.pinot.spi.data.readers.RecordReaderFactory;
-import org.apache.pinot.spi.utils.SchemaFieldExtractorUtils;
+import org.apache.pinot.core.util.SchemaUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -98,7 +98,7 @@ public class SegmentIndexCreationDriverImpl implements 
SegmentIndexCreationDrive
     Schema schema = segmentGeneratorConfig.getSchema();
     FileFormat fileFormat = segmentGeneratorConfig.getFormat();
     String recordReaderClassName = 
segmentGeneratorConfig.getRecordReaderPath();
-    Set<String> fields = 
SchemaFieldExtractorUtils.extractSourceFields(segmentGeneratorConfig.getSchema());
+    Set<String> fields = 
SchemaUtils.extractSourceFields(segmentGeneratorConfig.getSchema());
 
     // Allow for instantiation general record readers from a record reader 
path passed into segment generator config
     // If this is set, this will override the file format
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/SchemaFieldExtractorUtils.java
 b/pinot-core/src/main/java/org/apache/pinot/core/util/SchemaUtils.java
similarity index 80%
rename from 
pinot-spi/src/main/java/org/apache/pinot/spi/utils/SchemaFieldExtractorUtils.java
rename to pinot-core/src/main/java/org/apache/pinot/core/util/SchemaUtils.java
index 0c34046..d152647 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/SchemaFieldExtractorUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/util/SchemaUtils.java
@@ -16,14 +16,13 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.spi.utils;
+package org.apache.pinot.core.util;
 
-import com.google.common.annotations.VisibleForTesting;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-import org.apache.pinot.spi.data.function.evaluators.ExpressionEvaluator;
-import 
org.apache.pinot.spi.data.function.evaluators.ExpressionEvaluatorFactory;
+import org.apache.pinot.core.data.function.ExpressionEvaluator;
+import org.apache.pinot.core.data.function.ExpressionEvaluatorFactory;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.TimeFieldSpec;
@@ -33,13 +32,14 @@ import org.slf4j.LoggerFactory;
 
 
 /**
- * Extracts names of the source fields from the schema
+ * Schema utils
+ * FIXME: Merge this SchemaUtils with the SchemaUtils from pinot-common when 
merging of modules happens
  */
-public class SchemaFieldExtractorUtils {
+public class SchemaUtils {
   public static final String MAP_KEY_COLUMN_SUFFIX = "__KEYS";
   public static final String MAP_VALUE_COLUMN_SUFFIX = "__VALUES";
 
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(SchemaFieldExtractorUtils.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(SchemaUtils.class);
 
   /**
    * Extracts the source fields and destination fields from the schema
@@ -67,6 +67,16 @@ public class SchemaFieldExtractorUtils {
    * i.e. do not allow using source column name for destination column
    */
   public static boolean validate(Schema schema) {
+    return validate(schema, LOGGER);
+  }
+
+  /**
+   * Validates the following:
+   * 1) for a field spec with transform function, the source column name and 
destination column name are exclusive
+   * i.e. do not allow using source column name for destination column
+   * 2) Basic schema validations
+   */
+  public static boolean validate(Schema schema, Logger logger) {
     try {
       for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
         if (!fieldSpec.isVirtualColumn()) {
@@ -78,7 +88,7 @@ public class SchemaFieldExtractorUtils {
               List<String> arguments = expressionEvaluator.getArguments();
               // output column used as input
               if (arguments.contains(column)) {
-                LOGGER.error("The arguments of transform function: {}, should 
not contain the destination column: {}",
+                logger.error("The arguments of transform function: {}, should 
not contain the destination column: {}",
                     transformFunction, column);
                 return false;
               }
@@ -91,13 +101,14 @@ public class SchemaFieldExtractorUtils {
             if (!incomingGranularitySpec.equals(outgoingGranularitySpec)) {
               // different incoming and outgoing spec, but same name
               if 
(incomingGranularitySpec.getName().equals(outgoingGranularitySpec.getName())) {
-                LOGGER.error("Cannot convert from incoming field spec:{} to 
outgoing field spec:{} if name is the same",
+                logger.error("Cannot convert from incoming field spec:{} to 
outgoing field spec:{} if name is the same",
                     incomingGranularitySpec, outgoingGranularitySpec);
                 return false;
               } else {
-                if 
(!incomingGranularitySpec.getTimeFormat().equals(TimeGranularitySpec.TimeFormat.EPOCH.toString())
 || !outgoingGranularitySpec.getTimeFormat()
+                if 
(!incomingGranularitySpec.getTimeFormat().equals(TimeGranularitySpec.TimeFormat.EPOCH.toString())
+                    || !outgoingGranularitySpec.getTimeFormat()
                     .equals(TimeGranularitySpec.TimeFormat.EPOCH.toString())) {
-                  LOGGER.error(
+                  logger.error(
                       "When incoming and outgoing specs are different, cannot 
perform time conversion for time format other than EPOCH");
                   return false;
                 }
@@ -107,9 +118,9 @@ public class SchemaFieldExtractorUtils {
         }
       }
     } catch (Exception e) {
-      LOGGER.error("Exception in validating schema {}", 
schema.getSchemaName(), e);
+      logger.error("Exception in validating schema {}", 
schema.getSchemaName(), e);
       return false;
     }
-    return true;
+    return schema.validate(logger);
   }
 }
diff --git 
a/pinot-spi/src/test/java/org/apache/pinot/spi/data/function/evaluators/GroovyExpressionEvaluatorTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/function/GroovyExpressionEvaluatorTest.java
similarity index 98%
rename from 
pinot-spi/src/test/java/org/apache/pinot/spi/data/function/evaluators/GroovyExpressionEvaluatorTest.java
rename to 
pinot-core/src/test/java/org/apache/pinot/core/data/function/GroovyExpressionEvaluatorTest.java
index 7d24725..0ff8fad 100644
--- 
a/pinot-spi/src/test/java/org/apache/pinot/spi/data/function/evaluators/GroovyExpressionEvaluatorTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/function/GroovyExpressionEvaluatorTest.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.spi.data.function.evaluators;
+package org.apache.pinot.core.data.function;
 
 import java.util.ArrayList;
 import java.util.HashMap;
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/recordtransformer/ExpressionTransformerTimeTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/recordtransformer/ExpressionTransformerTimeTest.java
index 2e1bbbf..aa86b69 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/recordtransformer/ExpressionTransformerTimeTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/recordtransformer/ExpressionTransformerTimeTest.java
@@ -82,9 +82,10 @@ public class ExpressionTransformerTimeTest {
 
     // 3] both incoming and outgoing defined - same column name only - skip 
conversion - this shouldn't be allowed by validation during add schema
     try {
-      new Schema.SchemaBuilder()
+      pinotSchema = new Schema.SchemaBuilder()
           .addTime(new TimeGranularitySpec(FieldSpec.DataType.LONG, 
TimeUnit.MILLISECONDS, "time"),
               new TimeGranularitySpec(FieldSpec.DataType.LONG, TimeUnit.HOURS, 
"time")).build();
+      new ExpressionTransformer(pinotSchema);
       Assert.fail();
     } catch (Exception e) {
       // expected
@@ -168,15 +169,5 @@ public class ExpressionTransformerTimeTest {
     genericRow.putValue("time", 20180101);
     expressionTransformer.transform(genericRow);
     Assert.assertEquals(genericRow.getValue("time"), 20180101);
-
-    // When incoming and outgoing spec are not the same, simple date format is 
not allowed
-    try {
-      new Schema.SchemaBuilder().addTime(new 
TimeGranularitySpec(FieldSpec.DataType.INT, TimeUnit.DAYS,
-              TimeGranularitySpec.TimeFormat.SIMPLE_DATE_FORMAT.toString(), 
"incoming"),
-          new TimeGranularitySpec(FieldSpec.DataType.LONG, TimeUnit.SECONDS, 
"outgoing")).build();
-      fail();
-    } catch (Exception e) {
-      // Expected
-    }
   }
 }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
index a573d58..40356c9 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
@@ -31,7 +31,7 @@ import org.apache.pinot.spi.stream.StreamDecoderProvider;
 import org.apache.pinot.spi.stream.StreamLevelConsumer;
 import org.apache.pinot.spi.stream.StreamMessageDecoder;
 import org.apache.pinot.spi.stream.StreamMetadataProvider;
-import org.apache.pinot.spi.utils.SchemaFieldExtractorUtils;
+import org.apache.pinot.core.util.SchemaUtils;
 
 
 /**
@@ -94,7 +94,7 @@ public class FakeStreamConsumerFactory extends 
StreamConsumerFactory {
     // Message decoder
     Schema pinotSchema = FakeStreamConfigUtils.getPinotSchema();
     StreamMessageDecoder streamMessageDecoder = 
StreamDecoderProvider.create(streamConfig, pinotSchema,
-        SchemaFieldExtractorUtils.extractSourceFields(pinotSchema));
+        SchemaUtils.extractSourceFields(pinotSchema));
     GenericRow decodedRow = new GenericRow();
     streamMessageDecoder.decode(messageBatch.getMessageAtIndex(0), decodedRow);
     System.out.println(decodedRow);
diff --git 
a/pinot-spi/src/test/java/org/apache/pinot/spi/utils/SchemaFieldExtractorUtilsTest.java
 b/pinot-core/src/test/java/org/apache/pinot/core/util/SchemaUtilsTest.java
similarity index 84%
rename from 
pinot-spi/src/test/java/org/apache/pinot/spi/utils/SchemaFieldExtractorUtilsTest.java
rename to 
pinot-core/src/test/java/org/apache/pinot/core/util/SchemaUtilsTest.java
index 7c1f038..43f1812 100644
--- 
a/pinot-spi/src/test/java/org/apache/pinot/spi/utils/SchemaFieldExtractorUtilsTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/util/SchemaUtilsTest.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.spi.utils;
+package org.apache.pinot.core.util;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -35,7 +35,7 @@ import org.testng.annotations.Test;
 /**
  * Tests that the source field names are extracted correctly
  */
-public class SchemaFieldExtractorUtilsTest {
+public class SchemaUtilsTest {
 
   @Test
   public void testSourceFieldExtractorName() {
@@ -48,7 +48,7 @@ public class SchemaFieldExtractorUtilsTest {
     dimensionFieldSpec.setTransformFunction("Groovy({function}, argument1, 
argument2)");
     schema.addField(dimensionFieldSpec);
 
-    List<String> extract = new 
ArrayList<>(SchemaFieldExtractorUtils.extractSourceFields(schema));
+    List<String> extract = new 
ArrayList<>(SchemaUtils.extractSourceFields(schema));
     Assert.assertEquals(extract.size(), 3);
     Assert.assertTrue(extract.containsAll(Arrays.asList("d1", "argument1", 
"argument2")));
 
@@ -58,7 +58,7 @@ public class SchemaFieldExtractorUtilsTest {
     dimensionFieldSpec.setTransformFunction("Groovy({function})");
     schema.addField(dimensionFieldSpec);
 
-    extract = new 
ArrayList<>(SchemaFieldExtractorUtils.extractSourceFields(schema));
+    extract = new ArrayList<>(SchemaUtils.extractSourceFields(schema));
     Assert.assertEquals(extract.size(), 1);
     Assert.assertTrue(extract.contains("d1"));
 
@@ -67,7 +67,7 @@ public class SchemaFieldExtractorUtilsTest {
     dimensionFieldSpec = new DimensionFieldSpec("map__KEYS", 
FieldSpec.DataType.INT, false);
     schema.addField(dimensionFieldSpec);
 
-    extract = new 
ArrayList<>(SchemaFieldExtractorUtils.extractSourceFields(schema));
+    extract = new ArrayList<>(SchemaUtils.extractSourceFields(schema));
     Assert.assertEquals(extract.size(), 2);
     Assert.assertTrue(extract.containsAll(Arrays.asList("map", "map__KEYS")));
 
@@ -76,7 +76,7 @@ public class SchemaFieldExtractorUtilsTest {
     dimensionFieldSpec = new DimensionFieldSpec("map__VALUES", 
FieldSpec.DataType.LONG, false);
     schema.addField(dimensionFieldSpec);
 
-    extract = new 
ArrayList<>(SchemaFieldExtractorUtils.extractSourceFields(schema));
+    extract = new ArrayList<>(SchemaUtils.extractSourceFields(schema));
     Assert.assertEquals(extract.size(), 2);
     Assert.assertTrue(extract.containsAll(Arrays.asList("map", 
"map__VALUES")));
 
@@ -86,7 +86,7 @@ public class SchemaFieldExtractorUtilsTest {
     TimeFieldSpec timeFieldSpec = new TimeFieldSpec("time", 
FieldSpec.DataType.LONG, TimeUnit.MILLISECONDS);
     schema.addField(timeFieldSpec);
 
-    extract = new 
ArrayList<>(SchemaFieldExtractorUtils.extractSourceFields(schema));
+    extract = new ArrayList<>(SchemaUtils.extractSourceFields(schema));
     Assert.assertEquals(extract.size(), 1);
     Assert.assertTrue(extract.contains("time"));
 
@@ -97,7 +97,7 @@ public class SchemaFieldExtractorUtilsTest {
             TimeUnit.MILLISECONDS);
     schema.addField(timeFieldSpec);
 
-    extract = new 
ArrayList<>(SchemaFieldExtractorUtils.extractSourceFields(schema));
+    extract = new ArrayList<>(SchemaUtils.extractSourceFields(schema));
     Assert.assertEquals(extract.size(), 2);
     Assert.assertTrue(extract.containsAll(Arrays.asList("in", "out")));
   }
@@ -110,26 +110,26 @@ public class SchemaFieldExtractorUtilsTest {
     DimensionFieldSpec dimensionFieldSpec = new DimensionFieldSpec("dim1", 
FieldSpec.DataType.STRING, true);
     dimensionFieldSpec.setTransformFunction("Groovy({function}, argument1, 
dim1, argument3)");
     pinotSchema.addField(dimensionFieldSpec);
-    Assert.assertFalse(SchemaFieldExtractorUtils.validate(pinotSchema));
+    Assert.assertFalse(SchemaUtils.validate(pinotSchema));
 
     pinotSchema = new Schema();
     MetricFieldSpec metricFieldSpec = new MetricFieldSpec("m1", 
FieldSpec.DataType.LONG);
     metricFieldSpec.setTransformFunction("Groovy({function}, m1, m1)");
     pinotSchema.addField(metricFieldSpec);
-    Assert.assertFalse(SchemaFieldExtractorUtils.validate(pinotSchema));
+    Assert.assertFalse(SchemaUtils.validate(pinotSchema));
 
     pinotSchema = new Schema();
     TimeFieldSpec timeFieldSpec = new TimeFieldSpec("time", 
FieldSpec.DataType.LONG, TimeUnit.MILLISECONDS);
     timeFieldSpec.setTransformFunction("Groovy({function}, time)");
     pinotSchema.addField(timeFieldSpec);
-    Assert.assertFalse(SchemaFieldExtractorUtils.validate(pinotSchema));
+    Assert.assertFalse(SchemaUtils.validate(pinotSchema));
 
     // time field spec using same name for incoming and outgoing
     pinotSchema = new Schema();
     timeFieldSpec = new TimeFieldSpec(new 
TimeGranularitySpec(FieldSpec.DataType.LONG, TimeUnit.MILLISECONDS, "time"),
         new TimeGranularitySpec(FieldSpec.DataType.INT, TimeUnit.DAYS, 
"time"));
     pinotSchema.addField(timeFieldSpec);
-    Assert.assertFalse(SchemaFieldExtractorUtils.validate(pinotSchema));
+    Assert.assertFalse(SchemaUtils.validate(pinotSchema));
 
     // time field spec using SIMPLE_DATE_FORMAT, not allowed when conversion 
is needed
     pinotSchema = new Schema();
@@ -138,21 +138,21 @@ public class SchemaFieldExtractorUtilsTest {
             new TimeGranularitySpec(FieldSpec.DataType.INT, TimeUnit.DAYS,
                 TimeGranularitySpec.TimeFormat.SIMPLE_DATE_FORMAT.toString(), 
"outgoing"));
     pinotSchema.addField(timeFieldSpec);
-    Assert.assertFalse(SchemaFieldExtractorUtils.validate(pinotSchema));
+    Assert.assertFalse(SchemaUtils.validate(pinotSchema));
 
     // incorrect groovy function syntax
     pinotSchema = new Schema();
     dimensionFieldSpec = new DimensionFieldSpec("dim1", 
FieldSpec.DataType.STRING, true);
     dimensionFieldSpec.setTransformFunction("Groovy(function, argument3)");
     pinotSchema.addField(dimensionFieldSpec);
-    Assert.assertFalse(SchemaFieldExtractorUtils.validate(pinotSchema));
+    Assert.assertFalse(SchemaUtils.validate(pinotSchema));
 
     // valid schema, empty arguments
     pinotSchema = new Schema();
     dimensionFieldSpec = new DimensionFieldSpec("dim1", 
FieldSpec.DataType.STRING, true);
     dimensionFieldSpec.setTransformFunction("Groovy({function})");
     pinotSchema.addField(dimensionFieldSpec);
-    Assert.assertTrue(SchemaFieldExtractorUtils.validate(pinotSchema));
+    Assert.assertTrue(SchemaUtils.validate(pinotSchema));
 
     // valid schema
     pinotSchema = new Schema();
@@ -165,7 +165,7 @@ public class SchemaFieldExtractorUtilsTest {
     timeFieldSpec = new TimeFieldSpec("time", FieldSpec.DataType.LONG, 
TimeUnit.MILLISECONDS);
     timeFieldSpec.setTransformFunction("Groovy({function}, millis)");
     pinotSchema.addField(timeFieldSpec);
-    Assert.assertTrue(SchemaFieldExtractorUtils.validate(pinotSchema));
+    Assert.assertTrue(SchemaUtils.validate(pinotSchema));
 
     // valid time field spec
     pinotSchema = new Schema();
@@ -173,6 +173,6 @@ public class SchemaFieldExtractorUtilsTest {
         new TimeFieldSpec(new TimeGranularitySpec(FieldSpec.DataType.LONG, 
TimeUnit.MILLISECONDS, "incoming"),
             new TimeGranularitySpec(FieldSpec.DataType.INT, TimeUnit.DAYS, 
"outgoing"));
     pinotSchema.addField(timeFieldSpec);
-    Assert.assertTrue(SchemaFieldExtractorUtils.validate(pinotSchema));
+    Assert.assertTrue(SchemaUtils.validate(pinotSchema));
   }
 }
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MapTypeClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MapTypeClusterIntegrationTest.java
index 0c7a51d..031f379 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MapTypeClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MapTypeClusterIntegrationTest.java
@@ -36,7 +36,7 @@ import org.apache.avro.generic.GenericDatumWriter;
 import org.apache.commons.io.FileUtils;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.utils.SchemaFieldExtractorUtils;
+import org.apache.pinot.core.util.SchemaUtils;
 import org.apache.pinot.util.TestUtils;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
@@ -73,10 +73,10 @@ public class MapTypeClusterIntegrationTest extends 
BaseClusterIntegrationTest {
     // Create and upload segments
     File avroFile = createAvroFile();
     Schema schema = new Schema.SchemaBuilder().setSchemaName(getTableName())
-        .addMultiValueDimension(STRING_KEY_MAP_FIELD_NAME + 
SchemaFieldExtractorUtils.MAP_KEY_COLUMN_SUFFIX, DataType.STRING)
-        .addMultiValueDimension(STRING_KEY_MAP_FIELD_NAME + 
SchemaFieldExtractorUtils.MAP_VALUE_COLUMN_SUFFIX, DataType.INT)
-        .addMultiValueDimension(INT_KEY_MAP_FIELD_NAME + 
SchemaFieldExtractorUtils.MAP_KEY_COLUMN_SUFFIX, DataType.INT)
-        .addMultiValueDimension(INT_KEY_MAP_FIELD_NAME + 
SchemaFieldExtractorUtils.MAP_VALUE_COLUMN_SUFFIX, DataType.INT).build();
+        .addMultiValueDimension(STRING_KEY_MAP_FIELD_NAME + 
SchemaUtils.MAP_KEY_COLUMN_SUFFIX, DataType.STRING)
+        .addMultiValueDimension(STRING_KEY_MAP_FIELD_NAME + 
SchemaUtils.MAP_VALUE_COLUMN_SUFFIX, DataType.INT)
+        .addMultiValueDimension(INT_KEY_MAP_FIELD_NAME + 
SchemaUtils.MAP_KEY_COLUMN_SUFFIX, DataType.INT)
+        .addMultiValueDimension(INT_KEY_MAP_FIELD_NAME + 
SchemaUtils.MAP_VALUE_COLUMN_SUFFIX, DataType.INT).build();
     ExecutorService executor = Executors.newCachedThreadPool();
     ClusterIntegrationTestUtils
         .buildSegmentsFromAvro(Collections.singletonList(avroFile), 0, 
_segmentDir, _tarDir, getTableName(), null, null,
diff --git 
a/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordToPinotRowGeneratorTest.java
 
b/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordToPinotRowGeneratorTest.java
index 8a43f61..fc9029c 100644
--- 
a/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordToPinotRowGeneratorTest.java
+++ 
b/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordToPinotRowGeneratorTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.plugin.inputformat.avro;
 
+import com.google.common.collect.Sets;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
@@ -27,7 +28,6 @@ import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.readers.GenericRow;
-import org.apache.pinot.spi.utils.SchemaFieldExtractorUtils;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -47,7 +47,7 @@ public class AvroRecordToPinotRowGeneratorTest {
         new 
org.apache.pinot.spi.data.Schema.SchemaBuilder().setSchemaName("testSchema")
             .addTime("incomingTime", TimeUnit.MILLISECONDS, 
FieldSpec.DataType.LONG, "outgoingTime", TimeUnit.DAYS,
                 FieldSpec.DataType.INT).build();
-    Set<String> sourceFields = 
SchemaFieldExtractorUtils.extractSourceFields(pinotSchema);
+    Set<String> sourceFields = Sets.newHashSet("incomingTime", "outgoingTime");
 
     AvroRecordExtractor avroRecordExtractor = new AvroRecordExtractor();
     avroRecordExtractor.init(sourceFields, null);
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java
index 457d762..bc16ea7 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java
@@ -39,7 +39,6 @@ import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.data.FieldSpec.FieldType;
 import org.apache.pinot.spi.utils.EqualityUtils;
 import org.apache.pinot.spi.utils.JsonUtils;
-import org.apache.pinot.spi.utils.SchemaFieldExtractorUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -436,7 +435,7 @@ public final class Schema {
       }
     }
 
-    return SchemaFieldExtractorUtils.validate(this);
+    return true;
   }
 
   public static class SchemaBuilder {
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/CreateSegmentCommand.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/CreateSegmentCommand.java
index 680b808..f9034e8 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/CreateSegmentCommand.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/CreateSegmentCommand.java
@@ -40,7 +40,7 @@ import org.apache.pinot.spi.data.readers.RecordReader;
 import org.apache.pinot.spi.filesystem.PinotFS;
 import org.apache.pinot.spi.filesystem.PinotFSFactory;
 import org.apache.pinot.spi.utils.JsonUtils;
-import org.apache.pinot.spi.utils.SchemaFieldExtractorUtils;
+import org.apache.pinot.core.util.SchemaUtils;
 import org.apache.pinot.tools.Command;
 import org.kohsuke.args4j.Option;
 import org.slf4j.Logger;
@@ -365,7 +365,7 @@ public class CreateSegmentCommand extends 
AbstractBaseAdminCommand implements Co
                   if (_readerConfigFile != null) {
                     readerConfig = JsonUtils.fileToObject(new 
File(_readerConfigFile), CSVRecordReaderConfig.class);
                   }
-                  csvRecordReader.init(localFile, schema, readerConfig, 
SchemaFieldExtractorUtils.extractSourceFields(schema));
+                  csvRecordReader.init(localFile, schema, readerConfig, 
SchemaUtils.extractSourceFields(schema));
                   driver.init(config, csvRecordReader);
                   break;
                 default:
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/config/validator/SchemaValidator.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/config/validator/SchemaValidator.java
index 6048774..610c1c5 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/config/validator/SchemaValidator.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/config/validator/SchemaValidator.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.tools.config.validator;
 
+import org.apache.pinot.core.util.SchemaUtils;
 import org.apache.pinot.spi.data.Schema;
 
 
@@ -27,6 +28,6 @@ public class SchemaValidator {
 
   public static boolean validate(Schema schema) {
     // TODO: ADD MORE VALIDATIONS.
-    return schema.validate(null);
+    return SchemaUtils.validate(schema);
   }
 }
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java
index f78ed77..620748c 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java
@@ -36,7 +36,7 @@ import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.spi.stream.StreamDataProducer;
 import org.apache.pinot.spi.stream.StreamDataProvider;
 import org.apache.pinot.spi.stream.StreamMessageDecoder;
-import org.apache.pinot.spi.utils.SchemaFieldExtractorUtils;
+import org.apache.pinot.core.util.SchemaUtils;
 import org.apache.pinot.tools.utils.KafkaStarterUtils;
 import org.glassfish.tyrus.client.ClientManager;
 
@@ -68,7 +68,7 @@ public class MeetupRsvpStream {
     try {
       final ClientEndpointConfig cec = 
ClientEndpointConfig.Builder.create().build();
       final StreamMessageDecoder decoder = 
PluginManager.get().createInstance(KafkaStarterUtils.KAFKA_JSON_MESSAGE_DECODER_CLASS_NAME);
-      decoder.init(null, schema, null, 
SchemaFieldExtractorUtils.extractSourceFields(schema));
+      decoder.init(null, schema, null, 
SchemaUtils.extractSourceFields(schema));
       client = ClientManager.createClient();
       client.connectToServer(new Endpoint() {
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to