This is an automated email from the ASF dual-hosted git repository.
nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new bfddb7a Pull the source fields Set out of the RecordReader/Decoder
(#5303)
bfddb7a is described below
commit bfddb7a34dc60c80c059facaf0df3f7247fe81f9
Author: Neha Pawar <[email protected]>
AuthorDate: Mon Apr 27 11:48:53 2020 -0700
Pull the source fields Set out of the RecordReader/Decoder (#5303)
Adding the source field names HashSet to the init of RecordReader and
Decoder, thus removing it from inside the Reader/Decoder.
---
.../realtime/HLRealtimeSegmentDataManager.java | 5 +-
.../realtime/LLRealtimeSegmentDataManager.java | 5 +-
.../core/data/readers/GenericRowRecordReader.java | 3 +-
.../readers/MultiplePinotSegmentRecordReader.java | 2 +-
.../data/readers/PinotSegmentRecordReader.java | 3 +-
.../pinot/core/minion/BackfillDateTimeColumn.java | 199 ----------------
.../apache/pinot/core/minion/SegmentPurger.java | 4 +-
.../core/minion/segment/MapperRecordReader.java | 3 +-
.../core/minion/segment/ReducerRecordReader.java | 3 +-
.../converter/RealtimeSegmentRecordReader.java | 3 +-
.../impl/SegmentIndexCreationDriverImpl.java | 7 +-
.../readers/BackfillDateTimeRecordReaderTest.java | 256 ---------------------
.../data/readers/RecordReaderSampleDataTest.java | 35 +--
.../MutableSegmentImplNullValueVectorTest.java | 3 +-
.../mutable/MutableSegmentImplTest.java | 3 +-
.../impl/fakestream/FakeStreamConsumerFactory.java | 7 +-
.../impl/fakestream/FakeStreamMessageDecoder.java | 3 +-
.../pinot/integration/tests/ClusterTest.java | 4 +-
...lakyConsumerRealtimeClusterIntegrationTest.java | 11 +-
.../batch/common/SegmentGenerationTaskRunner.java | 11 +-
.../plugin/inputformat/avro/AvroRecordReader.java | 4 +-
.../avro/AvroRecordExtractorMapTypeTest.java | 9 +-
.../inputformat/avro/AvroRecordExtractorTest.java | 2 +-
.../inputformat/avro/AvroRecordReaderTest.java | 2 +-
.../avro/AvroRecordToPinotRowGeneratorTest.java | 2 +-
.../inputformat/avro/KafkaAvroMessageDecoder.java | 8 +-
.../inputformat/avro/SimpleAvroMessageDecoder.java | 8 +-
...aConfluentSchemaRegistryAvroMessageDecoder.java | 6 +-
.../plugin/inputformat/csv/CSVRecordReader.java | 4 +-
.../inputformat/csv/CSVRecordExtractorTest.java | 11 +-
.../inputformat/csv/CSVRecordReaderTest.java | 2 +-
.../plugin/inputformat/json/JSONRecordReader.java | 4 +-
.../inputformat/json/JSONRecordExtractorTest.java | 2 +-
.../inputformat/json/JSONRecordReaderTest.java | 2 +-
.../plugin/inputformat/orc/ORCRecordReader.java | 6 +-
.../inputformat/orc/ORCRecordExtractorTest.java | 2 +-
.../inputformat/orc/ORCRecordReaderTest.java | 2 +-
.../inputformat/parquet/ParquetRecordReader.java | 4 +-
.../parquet/ParquetRecordExtractorTest.java | 2 +-
.../parquet/ParquetRecordReaderTest.java | 2 +-
.../inputformat/thrift/ThriftRecordReader.java | 4 +-
.../inputformat/thrift/ThriftRecordReaderTest.java | 9 +-
.../stream/kafka09/KafkaConsumerFactory.java | 5 +-
.../stream/kafka09/KafkaStreamLevelConsumer.java | 5 +-
.../stream/kafka20/KafkaConsumerFactory.java | 5 +-
.../stream/kafka20/KafkaStreamLevelConsumer.java | 5 +-
.../stream/kafka/KafkaJSONMessageDecoder.java | 10 +-
.../stream/kafka/KafkaJSONMessageDecoderTest.java | 2 +-
.../evaluators/DefaultTimeSpecEvaluator.java | 1 -
.../pinot/spi/data/readers/RecordReader.java | 6 +-
.../spi/data/readers/RecordReaderFactory.java | 13 +-
.../pinot/spi/stream/StreamConsumerFactory.java | 6 +-
.../pinot/spi/stream/StreamDecoderProvider.java | 12 +-
.../pinot/spi/stream/StreamMessageDecoder.java | 12 +-
.../pinot/spi/utils/SchemaFieldExtractorUtils.java | 24 +-
.../data/readers/AbstractRecordExtractorTest.java | 11 +-
.../spi/data/readers/AbstractRecordReaderTest.java | 8 +
.../apache/pinot/spi/plugin/PluginManagerTest.java | 2 +-
.../spi/utils/SchemaFieldExtractorUtilsTest.java | 13 +-
pinot-spi/src/test/resources/TestRecordReader.java | 3 +-
.../pinot/tools/admin/PinotAdministrator.java | 2 -
.../command/BackfillDateTimeColumnCommand.java | 230 ------------------
.../tools/admin/command/CreateSegmentCommand.java | 3 +-
.../pinot/tools/streams/MeetupRsvpStream.java | 3 +-
64 files changed, 200 insertions(+), 848 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
index 98f127b..a2de83b 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
@@ -51,6 +51,7 @@ import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamConsumerFactory;
import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
import org.apache.pinot.spi.stream.StreamLevelConsumer;
+import org.apache.pinot.spi.utils.SchemaFieldExtractorUtils;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.slf4j.Logger;
@@ -167,8 +168,10 @@ public class HLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
// create and init stream level consumer
_streamConsumerFactory =
StreamConsumerFactoryProvider.create(_streamConfig);
String clientId = HLRealtimeSegmentDataManager.class.getSimpleName() + "-"
+ _streamConfig.getTopicName();
+ Set<String> sourceFields =
SchemaFieldExtractorUtils.extractSourceFields(schema);
_streamLevelConsumer = _streamConsumerFactory
- .createStreamLevelConsumer(clientId, tableNameWithType, schema,
instanceMetadata.getGroupId(tableNameWithType));
+ .createStreamLevelConsumer(clientId, tableNameWithType, schema,
instanceMetadata.getGroupId(tableNameWithType),
+ sourceFields);
_streamLevelConsumer.start();
tableStreamName = tableNameWithType + "_" + _streamConfig.getTopicName();
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 4779697..628a4d7 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -76,6 +76,7 @@ import org.apache.pinot.spi.stream.StreamDecoderProvider;
import org.apache.pinot.spi.stream.StreamMessageDecoder;
import org.apache.pinot.spi.stream.StreamMetadataProvider;
import org.apache.pinot.spi.stream.TransientConsumerException;
+import org.apache.pinot.spi.utils.SchemaFieldExtractorUtils;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.slf4j.Logger;
@@ -201,6 +202,7 @@ public class LLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
private final TableConfig _tableConfig;
private final RealtimeTableDataManager _realtimeTableDataManager;
private final StreamMessageDecoder _messageDecoder;
+ private final Set<String> _sourceFields;
private final int _segmentMaxRowCount;
private final String _resourceDataDir;
private final IndexLoadingConfig _indexLoadingConfig;
@@ -1151,7 +1153,8 @@ public class LLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
.setConsumerDir(consumerDir);
// Create message decoder
- _messageDecoder =
StreamDecoderProvider.create(_partitionLevelStreamConfig, _schema);
+ _sourceFields = SchemaFieldExtractorUtils.extractSourceFields(_schema);
+ _messageDecoder =
StreamDecoderProvider.create(_partitionLevelStreamConfig, _schema,
_sourceFields);
_clientId = _streamTopic + "-" + _streamPartitionId;
// Create record transformer
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/GenericRowRecordReader.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/GenericRowRecordReader.java
index fecf96d..c6f1377 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/GenericRowRecordReader.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/GenericRowRecordReader.java
@@ -20,6 +20,7 @@ package org.apache.pinot.core.data.readers;
import java.io.File;
import java.util.List;
+import java.util.Set;
import javax.annotation.Nullable;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
@@ -44,7 +45,7 @@ public class GenericRowRecordReader implements RecordReader {
}
@Override
- public void init(File dataFile, Schema schema, @Nullable RecordReaderConfig
recordReaderConfig) {
+ public void init(File dataFile, Schema schema, @Nullable RecordReaderConfig
recordReaderConfig, Set<String> sourceFields) {
}
@Override
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/MultiplePinotSegmentRecordReader.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/MultiplePinotSegmentRecordReader.java
index 92f0bc7..23a17b0 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/MultiplePinotSegmentRecordReader.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/MultiplePinotSegmentRecordReader.java
@@ -110,7 +110,7 @@ public class MultiplePinotSegmentRecordReader implements
RecordReader {
}
@Override
- public void init(File dataFile, Schema schema, @Nullable RecordReaderConfig
recordReaderConfig) {
+ public void init(File dataFile, Schema schema, @Nullable RecordReaderConfig
recordReaderConfig, Set<String> sourceFields) {
}
@Override
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/PinotSegmentRecordReader.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/PinotSegmentRecordReader.java
index c146b0e..0205607 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/readers/PinotSegmentRecordReader.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/readers/PinotSegmentRecordReader.java
@@ -24,6 +24,7 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.pinot.common.segment.ReadMode;
@@ -117,7 +118,7 @@ public class PinotSegmentRecordReader implements
RecordReader {
}
@Override
- public void init(File dataFile, Schema schema, @Nullable RecordReaderConfig
recordReaderConfig) {
+ public void init(File dataFile, Schema schema, @Nullable RecordReaderConfig
recordReaderConfig, Set<String> sourceFields) {
}
@Override
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/minion/BackfillDateTimeColumn.java
b/pinot-core/src/main/java/org/apache/pinot/core/minion/BackfillDateTimeColumn.java
deleted file mode 100644
index 69d9b34..0000000
---
a/pinot-core/src/main/java/org/apache/pinot/core/minion/BackfillDateTimeColumn.java
+++ /dev/null
@@ -1,199 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.minion;
-
-import com.google.common.base.Preconditions;
-import java.io.File;
-import java.io.IOException;
-import javax.annotation.Nullable;
-import org.apache.pinot.core.data.readers.PinotSegmentRecordReader;
-import org.apache.pinot.core.data.recordtransformer.CompositeTransformer;
-import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
-import
org.apache.pinot.core.segment.creator.RecordReaderSegmentCreationDataSource;
-import
org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
-import org.apache.pinot.core.segment.index.metadata.SegmentMetadataImpl;
-import org.apache.pinot.spi.data.DateTimeFieldSpec;
-import org.apache.pinot.spi.data.DateTimeFormatSpec;
-import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.data.TimeFieldSpec;
-import org.apache.pinot.spi.data.TimeGranularitySpec;
-import org.apache.pinot.spi.data.readers.FileFormat;
-import org.apache.pinot.spi.data.readers.GenericRow;
-import org.apache.pinot.spi.data.readers.RecordReader;
-import org.apache.pinot.spi.data.readers.RecordReaderConfig;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * The <code>BackfillDateTimeColumn</code> class takes a segment, a timeSpec
from the segment, and a
- * dateTimeSpec.
- * It creates a new segment with a new column corresponding to the
dateTimeSpec configs, using the values from the timeSpec
- * <ul>
- * <li>If a column corresponding to the dateTimeSpec already exists, it is
overwritten</li>
- * <li>If not, a new date time column is created</li>
- * <li>If the segment contains star tree, it is recreated, putting date time
column at the end</li>
- * </ul>
- * <p>
- */
-public class BackfillDateTimeColumn {
- private static final Logger LOGGER =
LoggerFactory.getLogger(BackfillDateTimeColumn.class);
-
- private final String _rawTableName;
- private final File _originalIndexDir;
- private final File _backfilledIndexDir;
- private final TimeFieldSpec _srcTimeFieldSpec;
- private final DateTimeFieldSpec _destDateTimeFieldSpec;
-
- public BackfillDateTimeColumn(String rawTableName, File originalIndexDir,
File backfilledIndexDir,
- TimeFieldSpec srcTimeSpec, DateTimeFieldSpec destDateTimeSpec) {
- _rawTableName = rawTableName;
- _originalIndexDir = originalIndexDir;
- _backfilledIndexDir = backfilledIndexDir;
-
Preconditions.checkArgument(!_originalIndexDir.getAbsolutePath().equals(_backfilledIndexDir.getAbsolutePath()),
- "Original index dir and backfill index dir should not be the same");
- _srcTimeFieldSpec = srcTimeSpec;
- _destDateTimeFieldSpec = destDateTimeSpec;
- }
-
- public boolean backfill()
- throws Exception {
- SegmentMetadataImpl originalSegmentMetadata = new
SegmentMetadataImpl(_originalIndexDir);
- String segmentName = originalSegmentMetadata.getName();
- LOGGER.info("Start backfilling segment: {} in table: {}", segmentName,
_rawTableName);
-
- PinotSegmentRecordReader segmentRecordReader = new
PinotSegmentRecordReader(_originalIndexDir);
- BackfillDateTimeRecordReader wrapperReader =
- new BackfillDateTimeRecordReader(segmentRecordReader,
_srcTimeFieldSpec, _destDateTimeFieldSpec);
- LOGGER.info("Segment dir: {} Output Dir: {}",
_originalIndexDir.getAbsolutePath(),
- _backfilledIndexDir.getAbsolutePath());
-
- LOGGER.info("Creating segment generator config for {}", segmentName);
- SegmentGeneratorConfig config = new SegmentGeneratorConfig();
- config.setFormat(FileFormat.PINOT);
- config.setOutDir(_backfilledIndexDir.getAbsolutePath());
- config.setOverwrite(true);
- config.setTableName(_rawTableName);
- config.setSegmentName(segmentName);
- config.setSchema(wrapperReader.getSchema());
-
- LOGGER.info("Creating segment for {} with config {}", segmentName,
config.toString());
- SegmentIndexCreationDriverImpl driver = new
SegmentIndexCreationDriverImpl();
- driver.init(config, new
RecordReaderSegmentCreationDataSource(wrapperReader),
- CompositeTransformer.getPassThroughTransformer());
- driver.build();
-
- return true;
- }
-
- public BackfillDateTimeRecordReader
getBackfillDateTimeRecordReader(RecordReader baseRecordReader) {
- return new BackfillDateTimeRecordReader(baseRecordReader,
_srcTimeFieldSpec, _destDateTimeFieldSpec);
- }
-
- /**
- * This record reader is a wrapper over another record reader.
- * It simply reads the records from the base record reader, and adds a new
field according to the
- * dateTimeFieldSpec
- */
- public class BackfillDateTimeRecordReader implements RecordReader {
- private final RecordReader _baseRecordReader;
- private final TimeFieldSpec _timeFieldSpec;
- private final DateTimeFieldSpec _dateTimeFieldSpec;
- private final Schema _schema;
-
- public BackfillDateTimeRecordReader(RecordReader baseRecordReader,
TimeFieldSpec timeFieldSpec,
- DateTimeFieldSpec dateTimeFieldSpec) {
- _baseRecordReader = baseRecordReader;
- _timeFieldSpec = timeFieldSpec;
- _dateTimeFieldSpec = dateTimeFieldSpec;
- _schema = baseRecordReader.getSchema();
-
- // Add/replace the date time field spec to the schema
- _schema.removeField(_dateTimeFieldSpec.getName());
- _schema.addField(_dateTimeFieldSpec);
- }
-
- @Override
- public void init(File dataFile, Schema schema, @Nullable
RecordReaderConfig recordReaderConfig) {
- }
-
- @Override
- public boolean hasNext() {
- return _baseRecordReader.hasNext();
- }
-
- @Override
- public GenericRow next()
- throws IOException {
- return next(new GenericRow());
- }
-
- /**
- * Reads the next row from the baseRecordReader, and adds a
dateTimeFieldSPec column to it
- * {@inheritDoc}
- * @see RecordReader#next(GenericRow)
- */
- @Override
- public GenericRow next(GenericRow reuse)
- throws IOException {
- reuse = _baseRecordReader.next(reuse);
- Long timeColumnValue = (Long) reuse.getValue(_timeFieldSpec.getName());
- Object dateTimeColumnValue =
convertTimeFieldToDateTimeFieldSpec(timeColumnValue);
- reuse.putField(_dateTimeFieldSpec.getName(), dateTimeColumnValue);
- return reuse;
- }
-
- /**
- * Converts the time column value from timeFieldSpec to dateTimeFieldSpec
- * @param timeColumnValue - time column value from timeFieldSpec
- * @return
- */
- private Object convertTimeFieldToDateTimeFieldSpec(Object timeColumnValue)
{
- TimeGranularitySpec timeGranularitySpec =
_timeFieldSpec.getOutgoingGranularitySpec();
-
- DateTimeFormatSpec formatFromTimeSpec =
- new DateTimeFormatSpec(timeGranularitySpec.getTimeUnitSize(),
timeGranularitySpec.getTimeType().toString(),
- timeGranularitySpec.getTimeFormat());
- if
(formatFromTimeSpec.getFormat().equals(_dateTimeFieldSpec.getFormat())) {
- return timeColumnValue;
- }
-
- long timeColumnValueMS = timeGranularitySpec.toMillis(timeColumnValue);
- DateTimeFormatSpec toFormat = new
DateTimeFormatSpec(_dateTimeFieldSpec.getFormat());
- return toFormat.fromMillisToFormat(timeColumnValueMS, Object.class);
- }
-
- @Override
- public void rewind()
- throws IOException {
- _baseRecordReader.rewind();
- }
-
- @Override
- public Schema getSchema() {
- return _schema;
- }
-
- @Override
- public void close()
- throws IOException {
- _baseRecordReader.close();
- }
- }
-}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/minion/SegmentPurger.java
b/pinot-core/src/main/java/org/apache/pinot/core/minion/SegmentPurger.java
index 5cab0da..bdeaaae 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/minion/SegmentPurger.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/minion/SegmentPurger.java
@@ -23,6 +23,7 @@ import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.Set;
import javax.annotation.Nullable;
import org.apache.pinot.common.segment.ReadMode;
import org.apache.pinot.core.data.readers.PinotSegmentRecordReader;
@@ -162,7 +163,8 @@ public class SegmentPurger {
}
@Override
- public void init(File dataFile, Schema schema, @Nullable
RecordReaderConfig recordReaderConfig) {
+ public void init(File dataFile, Schema schema, @Nullable
RecordReaderConfig recordReaderConfig,
+ Set<String> sourceFields) {
}
@Override
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/minion/segment/MapperRecordReader.java
b/pinot-core/src/main/java/org/apache/pinot/core/minion/segment/MapperRecordReader.java
index 97be252..d892a36 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/minion/segment/MapperRecordReader.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/minion/segment/MapperRecordReader.java
@@ -22,6 +22,7 @@ import com.google.common.base.Preconditions;
import java.io.File;
import java.io.IOException;
import java.util.List;
+import java.util.Set;
import javax.annotation.Nullable;
import org.apache.pinot.core.data.readers.MultiplePinotSegmentRecordReader;
import org.apache.pinot.spi.data.Schema;
@@ -55,7 +56,7 @@ public class MapperRecordReader implements RecordReader {
}
@Override
- public void init(File dataFile, Schema schema, @Nullable RecordReaderConfig
recordReaderConfig) {
+ public void init(File dataFile, Schema schema, @Nullable RecordReaderConfig
recordReaderConfig, Set<String> sourceFields) {
}
@Override
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/minion/segment/ReducerRecordReader.java
b/pinot-core/src/main/java/org/apache/pinot/core/minion/segment/ReducerRecordReader.java
index 52a7241..a727c79 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/minion/segment/ReducerRecordReader.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/minion/segment/ReducerRecordReader.java
@@ -22,6 +22,7 @@ import com.google.common.base.Preconditions;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
+import java.util.Set;
import javax.annotation.Nullable;
import org.apache.pinot.core.data.readers.PinotSegmentRecordReader;
import org.apache.pinot.spi.data.Schema;
@@ -52,7 +53,7 @@ public class ReducerRecordReader implements RecordReader {
}
@Override
- public void init(File dataFile, Schema schema, @Nullable RecordReaderConfig
recordReaderConfig) {
+ public void init(File dataFile, Schema schema, @Nullable RecordReaderConfig
recordReaderConfig, Set<String> sourceFields) {
}
@Override
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/RealtimeSegmentRecordReader.java
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/RealtimeSegmentRecordReader.java
index 7ebd327..07f0615 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/RealtimeSegmentRecordReader.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/RealtimeSegmentRecordReader.java
@@ -19,6 +19,7 @@
package org.apache.pinot.core.realtime.converter;
import java.io.File;
+import java.util.Set;
import javax.annotation.Nullable;
import org.apache.pinot.core.indexsegment.mutable.MutableSegmentImpl;
import org.apache.pinot.spi.data.Schema;
@@ -57,7 +58,7 @@ public class RealtimeSegmentRecordReader implements
RecordReader {
}
@Override
- public void init(File dataFile, Schema schema, @Nullable RecordReaderConfig
recordReaderConfig) {
+ public void init(File dataFile, Schema schema, @Nullable RecordReaderConfig
recordReaderConfig, Set<String> sourceFields) {
}
@Override
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java
b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java
index 9653174..2249271 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java
@@ -57,6 +57,7 @@ import org.apache.pinot.spi.data.readers.FileFormat;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.RecordReader;
import org.apache.pinot.spi.data.readers.RecordReaderFactory;
+import org.apache.pinot.spi.utils.SchemaFieldExtractorUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -97,6 +98,7 @@ public class SegmentIndexCreationDriverImpl implements
SegmentIndexCreationDrive
Schema schema = segmentGeneratorConfig.getSchema();
FileFormat fileFormat = segmentGeneratorConfig.getFormat();
String recordReaderClassName =
segmentGeneratorConfig.getRecordReaderPath();
+ Set<String> fields =
SchemaFieldExtractorUtils.extractSourceFields(segmentGeneratorConfig.getSchema());
// Allow for instantiation general record readers from a record reader
path passed into segment generator config
// If this is set, this will override the file format
@@ -108,7 +110,8 @@ public class SegmentIndexCreationDriverImpl implements
SegmentIndexCreationDrive
fileFormat);
}
return RecordReaderFactory
- .getRecordReaderByClass(recordReaderClassName, dataFile, schema,
segmentGeneratorConfig.getReaderConfig());
+ .getRecordReaderByClass(recordReaderClassName, dataFile, schema,
segmentGeneratorConfig.getReaderConfig(),
+ fields);
}
switch (fileFormat) {
@@ -118,7 +121,7 @@ public class SegmentIndexCreationDriverImpl implements
SegmentIndexCreationDrive
default:
try {
return org.apache.pinot.spi.data.readers.RecordReaderFactory
- .getRecordReader(fileFormat, dataFile, schema,
segmentGeneratorConfig.getReaderConfig());
+ .getRecordReader(fileFormat, dataFile, schema,
segmentGeneratorConfig.getReaderConfig(), fields);
} catch (Exception e) {
throw new UnsupportedOperationException("Unsupported input file
format: '" + fileFormat + "'", e);
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/data/readers/BackfillDateTimeRecordReaderTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/data/readers/BackfillDateTimeRecordReaderTest.java
deleted file mode 100644
index 954e204..0000000
---
a/pinot-core/src/test/java/org/apache/pinot/core/data/readers/BackfillDateTimeRecordReaderTest.java
+++ /dev/null
@@ -1,256 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.data.readers;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.concurrent.TimeUnit;
-import org.apache.commons.lang3.RandomStringUtils;
-import org.apache.pinot.spi.data.DateTimeFieldSpec;
-import org.apache.pinot.spi.data.DateTimeFormatSpec;
-import org.apache.pinot.spi.data.DimensionFieldSpec;
-import org.apache.pinot.spi.data.FieldSpec;
-import org.apache.pinot.spi.data.FieldSpec.DataType;
-import org.apache.pinot.spi.data.MetricFieldSpec;
-import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.data.TimeFieldSpec;
-import org.apache.pinot.spi.data.TimeGranularitySpec;
-import org.apache.pinot.spi.data.readers.GenericRow;
-import org.apache.pinot.core.minion.BackfillDateTimeColumn;
-import
org.apache.pinot.core.minion.BackfillDateTimeColumn.BackfillDateTimeRecordReader;
-import org.apache.pinot.spi.data.readers.RecordReader;
-import org.testng.Assert;
-import org.testng.annotations.DataProvider;
-import org.testng.annotations.Test;
-
-
-/**
- * Tests the PinotSegmentRecordReader to check that the records being generated
- * are the same as the records used to create the segment
- */
-public class BackfillDateTimeRecordReaderTest {
- private static final int NUM_ROWS = 10000;
- private static final String TABLE_NAME = "myTable";
- private static String D1 = "d1";
- private static String D2 = "d2";
- private static String M1 = "m1";
- private static String M2 = "m2";
-
- private List<GenericRow> createTestDataWithTimespec(TimeFieldSpec
timeFieldSpec) {
- List<GenericRow> rows = new ArrayList<>();
- Random random = new Random();
-
- Map<String, Object> fields;
- for (int i = 0; i < NUM_ROWS; i++) {
- fields = new HashMap<>();
- fields.put(D1, RandomStringUtils.randomAlphabetic(2));
- fields.put(D2, RandomStringUtils.randomAlphabetic(5));
- fields.put(M1, Math.abs(random.nextInt()));
- fields.put(M2, Math.abs(random.nextFloat()));
-
- long timestamp = System.currentTimeMillis();
- Object timeColumnValue =
timeFieldSpec.getIncomingGranularitySpec().fromMillis(timestamp);
- fields.put(timeFieldSpec.getName(), timeColumnValue);
-
- GenericRow row = new GenericRow();
- row.init(fields);
- rows.add(row);
- }
- return rows;
- }
-
- private List<GenericRow> createTestDataWithTimespec(TimeFieldSpec
timeFieldSpec,
- DateTimeFieldSpec dateTimeFieldSpec) {
- List<GenericRow> rows = new ArrayList<>();
- Random random = new Random();
-
- Map<String, Object> fields;
- for (int i = 0; i < NUM_ROWS; i++) {
- fields = new HashMap<>();
- fields.put(D1, RandomStringUtils.randomAlphabetic(2));
- fields.put(D2, RandomStringUtils.randomAlphabetic(5));
- fields.put(M1, Math.abs(random.nextInt()));
- fields.put(M2, Math.abs(random.nextFloat()));
-
- long timestamp = System.currentTimeMillis();
- Object timeColumnValue =
timeFieldSpec.getIncomingGranularitySpec().fromMillis(timestamp);
- fields.put(timeFieldSpec.getName(), timeColumnValue);
-
- DateTimeFormatSpec toFormat = new
DateTimeFormatSpec(dateTimeFieldSpec.getFormat());
- Object dateTimeColumnValue = toFormat.fromMillisToFormat(timestamp,
Object.class);
- fields.put(dateTimeFieldSpec.getName(), dateTimeColumnValue);
-
- GenericRow row = new GenericRow();
- row.init(fields);
- rows.add(row);
- }
- return rows;
- }
-
- private Schema createPinotSchemaWithTimeSpec(TimeFieldSpec timeSpec) {
- Schema testSchema = new Schema();
- testSchema.setSchemaName("schema");
- FieldSpec spec;
- spec = new DimensionFieldSpec(D1, DataType.STRING, true);
- testSchema.addField(spec);
- spec = new DimensionFieldSpec(D2, DataType.STRING, true);
- testSchema.addField(spec);
- spec = new MetricFieldSpec(M1, DataType.INT);
- testSchema.addField(spec);
- spec = new MetricFieldSpec(M2, DataType.FLOAT);
- testSchema.addField(spec);
- testSchema.addField(timeSpec);
- return testSchema;
- }
-
- private Schema createPinotSchemaWithTimeSpec(TimeFieldSpec timeSpec,
DateTimeFieldSpec dateTimeFieldSpec) {
- Schema testSchema = createPinotSchemaWithTimeSpec(timeSpec);
- testSchema.addField(dateTimeFieldSpec);
- return testSchema;
- }
-
- private Schema createPinotSchemaWrapperWithDateTimeSpec(Schema schema,
DateTimeFieldSpec dateTimeFieldSpec) {
- schema.addField(dateTimeFieldSpec);
- return schema;
- }
-
- @Test(dataProvider = "backfillRecordReaderDataProvider")
- public void testBackfillDateTimeRecordReader(RecordReader baseRecordReader,
TimeFieldSpec timeFieldSpec,
- DateTimeFieldSpec dateTimeFieldSpec, Schema schemaExpected)
- throws Exception {
- BackfillDateTimeColumn backfillDateTimeColumn =
- new BackfillDateTimeColumn(TABLE_NAME, new File("original"), new
File("backup"), timeFieldSpec,
- dateTimeFieldSpec);
- try (BackfillDateTimeRecordReader wrapperReader = backfillDateTimeColumn
- .getBackfillDateTimeRecordReader(baseRecordReader)) {
-
- // check that schema has new column
- Schema schemaActual = wrapperReader.getSchema();
- Assert.assertEquals(schemaActual, schemaExpected);
-
- DateTimeFieldSpec dateTimeFieldSpecActual =
schemaActual.getDateTimeSpec(dateTimeFieldSpec.getName());
- TimeFieldSpec timeFieldSpecActual = schemaActual.getTimeFieldSpec();
- Assert.assertEquals(dateTimeFieldSpecActual, dateTimeFieldSpec);
- Assert.assertEquals(timeFieldSpecActual, timeFieldSpec);
-
- while (wrapperReader.hasNext()) {
- GenericRow next = wrapperReader.next();
-
- // check that new datetime column is generated
- Object dateTimeColumnValueActual =
next.getValue(dateTimeFieldSpec.getName());
- Assert.assertNotNull(dateTimeColumnValueActual);
-
- Object timeColumnValueActual = next.getValue(timeFieldSpec.getName());
- Assert.assertNotNull(timeColumnValueActual);
-
- // check that datetime column has correct value as per its format
- Long timeColumnValueMS =
timeFieldSpec.getIncomingGranularitySpec().toMillis(timeColumnValueActual);
- DateTimeFormatSpec toFormat = new
DateTimeFormatSpec(dateTimeFieldSpec.getFormat());
- Object dateTimeColumnValueExpected =
toFormat.fromMillisToFormat(timeColumnValueMS, Object.class);
- Assert.assertEquals(dateTimeColumnValueActual,
dateTimeColumnValueExpected);
- }
- }
- }
-
- @DataProvider(name = "backfillRecordReaderDataProvider")
- public Object[][] getDataForTestBackfillRecordReader()
- throws Exception {
- List<Object[]> entries = new ArrayList<>();
-
- List<GenericRow> inputData;
- Schema inputSchema;
- RecordReader inputRecordReader;
- TimeFieldSpec timeFieldSpec;
- DateTimeFieldSpec dateTimeFieldSpec;
- Schema wrapperSchema;
-
- // timeSpec in hoursSinceEpoch, generate dateTimeFieldSpec in
millisSinceEpoch
- timeFieldSpec = new TimeFieldSpec(new TimeGranularitySpec(DataType.LONG,
TimeUnit.HOURS, "Date"));
- inputData = createTestDataWithTimespec(timeFieldSpec);
- inputSchema = createPinotSchemaWithTimeSpec(timeFieldSpec);
- inputRecordReader = new GenericRowRecordReader(inputData, inputSchema);
- dateTimeFieldSpec = new DateTimeFieldSpec("timestampInEpoch",
DataType.LONG, "1:MILLISECONDS:EPOCH", "1:HOURS");
- wrapperSchema = createPinotSchemaWrapperWithDateTimeSpec(inputSchema,
dateTimeFieldSpec);
- entries.add(new Object[]{inputRecordReader, timeFieldSpec,
dateTimeFieldSpec, wrapperSchema});
-
- // timeSpec in hoursSinceEpoch, generate dateTimeFieldSpec in sdf day
- timeFieldSpec = new TimeFieldSpec(new TimeGranularitySpec(DataType.LONG,
TimeUnit.HOURS, "Date"));
- inputData = createTestDataWithTimespec(timeFieldSpec);
- inputSchema = createPinotSchemaWithTimeSpec(timeFieldSpec);
- inputRecordReader = new GenericRowRecordReader(inputData, inputSchema);
- dateTimeFieldSpec =
- new DateTimeFieldSpec("timestampInEpoch", DataType.LONG,
"1:DAYS:SIMPLE_DATE_FORMAT:yyyyMMdd", "1:HOURS");
- wrapperSchema = createPinotSchemaWrapperWithDateTimeSpec(inputSchema,
dateTimeFieldSpec);
- entries.add(new Object[]{inputRecordReader, timeFieldSpec,
dateTimeFieldSpec, wrapperSchema});
-
- // timeSpec in hoursSinceEpoch, generate dateTimeFieldSpec in
hoursSinceEpoch
- timeFieldSpec = new TimeFieldSpec(new TimeGranularitySpec(DataType.LONG,
TimeUnit.HOURS, "Date"));
- inputData = createTestDataWithTimespec(timeFieldSpec);
- inputSchema = createPinotSchemaWithTimeSpec(timeFieldSpec);
- inputRecordReader = new GenericRowRecordReader(inputData, inputSchema);
- dateTimeFieldSpec = new DateTimeFieldSpec("timestampInEpoch",
DataType.LONG, "1:HOURS:EPOCH", "1:HOURS");
- wrapperSchema = createPinotSchemaWrapperWithDateTimeSpec(inputSchema,
dateTimeFieldSpec);
- entries.add(new Object[]{inputRecordReader, timeFieldSpec,
dateTimeFieldSpec, wrapperSchema});
-
- // timeSpec in millisSinceEpoch, generate dateTimeFieldSpec in 5
minutesSinceEpoch
- timeFieldSpec = new TimeFieldSpec(new TimeGranularitySpec(DataType.LONG,
TimeUnit.MILLISECONDS, "Date"));
- inputData = createTestDataWithTimespec(timeFieldSpec);
- inputSchema = createPinotSchemaWithTimeSpec(timeFieldSpec);
- inputRecordReader = new GenericRowRecordReader(inputData, inputSchema);
- dateTimeFieldSpec = new DateTimeFieldSpec("timestampInEpoch",
DataType.LONG, "5:MILLISECONDS:EPOCH", "1:HOURS");
- wrapperSchema = createPinotSchemaWrapperWithDateTimeSpec(inputSchema,
dateTimeFieldSpec);
- entries.add(new Object[]{inputRecordReader, timeFieldSpec,
dateTimeFieldSpec, wrapperSchema});
-
- // timeSpec in hoursSinceEpoch, dateTimeFieldSpec in millisSinceEpoch,
override dateTimeFieldSpec in millisSinceEpoch
- timeFieldSpec = new TimeFieldSpec(new TimeGranularitySpec(DataType.LONG,
TimeUnit.HOURS, "Date"));
- dateTimeFieldSpec = new DateTimeFieldSpec("timestampInEpoch",
DataType.LONG, "1:MILLISECONDS:EPOCH", "1:HOURS");
- inputData = createTestDataWithTimespec(timeFieldSpec, dateTimeFieldSpec);
- inputSchema = createPinotSchemaWithTimeSpec(timeFieldSpec,
dateTimeFieldSpec);
- inputRecordReader = new GenericRowRecordReader(inputData, inputSchema);
- entries.add(new Object[]{inputRecordReader, timeFieldSpec,
dateTimeFieldSpec, inputSchema});
-
- // timeSpec in hoursSinceEpoch, dateTimeFieldSpec in hoursSinceEpoch,
override dateTimeFieldSpec in millisSinceEpoch
- timeFieldSpec = new TimeFieldSpec(new TimeGranularitySpec(DataType.LONG,
TimeUnit.HOURS, "Date"));
- dateTimeFieldSpec = new DateTimeFieldSpec("timestampInEpoch",
DataType.LONG, "1:MILLISECONDS:EPOCH", "1:HOURS");
- inputData = createTestDataWithTimespec(timeFieldSpec, dateTimeFieldSpec);
- inputSchema = createPinotSchemaWithTimeSpec(timeFieldSpec,
dateTimeFieldSpec);
- inputRecordReader = new GenericRowRecordReader(inputData, inputSchema);
- dateTimeFieldSpec = new DateTimeFieldSpec("timestampInEpoch",
DataType.LONG, "1:MILLISECONDS:EPOCH", "1:HOURS");
- wrapperSchema = createPinotSchemaWithTimeSpec(timeFieldSpec,
dateTimeFieldSpec);
- entries.add(new Object[]{inputRecordReader, timeFieldSpec,
dateTimeFieldSpec, wrapperSchema});
-
- // timeSpec in hoursSinceEpoch, dateTimeFieldSpec in hoursSinceEpoch, add
new dateTimeFieldSpec in millisSinceEpoch
- timeFieldSpec = new TimeFieldSpec(new TimeGranularitySpec(DataType.LONG,
TimeUnit.HOURS, "Date"));
- dateTimeFieldSpec = new DateTimeFieldSpec("hoursSinceEpoch",
DataType.LONG, "1:HOURS:EPOCH", "1:HOURS");
- inputData = createTestDataWithTimespec(timeFieldSpec, dateTimeFieldSpec);
- inputSchema = createPinotSchemaWithTimeSpec(timeFieldSpec,
dateTimeFieldSpec);
- inputRecordReader = new GenericRowRecordReader(inputData, inputSchema);
- DateTimeFieldSpec dateTimeFieldSpecNew =
- new DateTimeFieldSpec("timestampInEpoch", DataType.LONG,
"1:MILLISECONDS:EPOCH", "1:HOURS");
- wrapperSchema = createPinotSchemaWithTimeSpec(timeFieldSpec,
dateTimeFieldSpec);
- wrapperSchema = createPinotSchemaWrapperWithDateTimeSpec(wrapperSchema,
dateTimeFieldSpecNew);
- entries.add(new Object[]{inputRecordReader, timeFieldSpec,
dateTimeFieldSpecNew, wrapperSchema});
-
- return entries.toArray(new Object[entries.size()][]);
- }
-}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/data/readers/RecordReaderSampleDataTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/data/readers/RecordReaderSampleDataTest.java
index 0a4735f..69d766e 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/data/readers/RecordReaderSampleDataTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/data/readers/RecordReaderSampleDataTest.java
@@ -19,8 +19,10 @@
package org.apache.pinot.core.data.readers;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
import java.io.File;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.core.data.recordtransformer.CompositeTransformer;
import org.apache.pinot.spi.data.FieldSpec;
@@ -62,6 +64,7 @@ public class RecordReaderSampleDataTest {
private final Schema SCHEMA_NO_INCOMING = new Schema.SchemaBuilder()
.addTime("incoming", TimeUnit.SECONDS, FieldSpec.DataType.LONG,
"time_day", TimeUnit.DAYS, FieldSpec.DataType.INT)
.build();
+
// Outgoing time column does not exist in the record, should read incoming
time only
private final Schema SCHEMA_NO_OUTGOING = new Schema.SchemaBuilder()
.addTime("time_day", TimeUnit.SECONDS, FieldSpec.DataType.LONG,
"outgoing", TimeUnit.DAYS, FieldSpec.DataType.INT)
@@ -72,11 +75,11 @@ public class RecordReaderSampleDataTest {
throws Exception {
CompositeTransformer defaultTransformer =
CompositeTransformer.getDefaultTransformer(SCHEMA);
try (RecordReader avroRecordReader = RecordReaderFactory
- .getRecordReader(FileFormat.AVRO, AVRO_SAMPLE_DATA_FILE, SCHEMA, null);
+ .getRecordReader(FileFormat.AVRO, AVRO_SAMPLE_DATA_FILE, SCHEMA, null,
SCHEMA.getColumnNames());
RecordReader csvRecordReader = RecordReaderFactory
- .getRecordReader(FileFormat.CSV, CSV_SAMPLE_DATA_FILE, SCHEMA,
null);
+ .getRecordReader(FileFormat.CSV, CSV_SAMPLE_DATA_FILE, SCHEMA,
null, SCHEMA.getColumnNames());
RecordReader jsonRecordReader = RecordReaderFactory
- .getRecordReader(FileFormat.JSON, JSON_SAMPLE_DATA_FILE, SCHEMA,
null)) {
+ .getRecordReader(FileFormat.JSON, JSON_SAMPLE_DATA_FILE, SCHEMA,
null, SCHEMA.getColumnNames())) {
int numRecords = 0;
while (avroRecordReader.hasNext()) {
assertTrue(csvRecordReader.hasNext());
@@ -117,11 +120,14 @@ public class RecordReaderSampleDataTest {
public void testDifferentIncomingOutgoing()
throws Exception {
try (RecordReader avroRecordReader = RecordReaderFactory
- .getRecordReader(FileFormat.AVRO, AVRO_SAMPLE_DATA_FILE,
SCHEMA_DIFFERENT_INCOMING_OUTGOING, null);
+ .getRecordReader(FileFormat.AVRO, AVRO_SAMPLE_DATA_FILE,
SCHEMA_DIFFERENT_INCOMING_OUTGOING, null,
+ Sets.newHashSet("time_day", "column2"));
RecordReader csvRecordReader = RecordReaderFactory
- .getRecordReader(FileFormat.CSV, CSV_SAMPLE_DATA_FILE,
SCHEMA_DIFFERENT_INCOMING_OUTGOING, null);
+ .getRecordReader(FileFormat.CSV, CSV_SAMPLE_DATA_FILE,
SCHEMA_DIFFERENT_INCOMING_OUTGOING, null,
+ Sets.newHashSet("time_day", "column2"));
RecordReader jsonRecordReader = RecordReaderFactory
- .getRecordReader(FileFormat.JSON, JSON_SAMPLE_DATA_FILE,
SCHEMA_DIFFERENT_INCOMING_OUTGOING, null)) {
+ .getRecordReader(FileFormat.JSON, JSON_SAMPLE_DATA_FILE,
SCHEMA_DIFFERENT_INCOMING_OUTGOING, null,
+ Sets.newHashSet("time_day", "column2"))) {
int numRecords = 0;
while (avroRecordReader.hasNext()) {
assertTrue(csvRecordReader.hasNext());
@@ -151,11 +157,11 @@ public class RecordReaderSampleDataTest {
public void testNoIncoming()
throws Exception {
try (RecordReader avroRecordReader = RecordReaderFactory
- .getRecordReader(FileFormat.AVRO, AVRO_SAMPLE_DATA_FILE,
SCHEMA_NO_INCOMING, null);
+ .getRecordReader(FileFormat.AVRO, AVRO_SAMPLE_DATA_FILE,
SCHEMA_NO_INCOMING, null, SCHEMA_NO_INCOMING.getColumnNames());
RecordReader csvRecordReader = RecordReaderFactory
- .getRecordReader(FileFormat.CSV, CSV_SAMPLE_DATA_FILE,
SCHEMA_NO_INCOMING, null);
+ .getRecordReader(FileFormat.CSV, CSV_SAMPLE_DATA_FILE,
SCHEMA_NO_INCOMING, null, SCHEMA_NO_INCOMING.getColumnNames());
RecordReader jsonRecordReader = RecordReaderFactory
- .getRecordReader(FileFormat.JSON, JSON_SAMPLE_DATA_FILE,
SCHEMA_NO_INCOMING, null)) {
+ .getRecordReader(FileFormat.JSON, JSON_SAMPLE_DATA_FILE,
SCHEMA_NO_INCOMING, null, SCHEMA_NO_INCOMING.getColumnNames())) {
int numRecords = 0;
while (avroRecordReader.hasNext()) {
assertTrue(csvRecordReader.hasNext());
@@ -185,11 +191,12 @@ public class RecordReaderSampleDataTest {
public void testNoOutgoing()
throws Exception {
try (RecordReader avroRecordReader = RecordReaderFactory
- .getRecordReader(FileFormat.AVRO, AVRO_SAMPLE_DATA_FILE,
SCHEMA_NO_OUTGOING, null);
- RecordReader csvRecordReader = RecordReaderFactory
- .getRecordReader(FileFormat.CSV, CSV_SAMPLE_DATA_FILE,
SCHEMA_NO_OUTGOING, null);
- RecordReader jsonRecordReader = RecordReaderFactory
- .getRecordReader(FileFormat.JSON, JSON_SAMPLE_DATA_FILE,
SCHEMA_NO_OUTGOING, null)) {
+ .getRecordReader(FileFormat.AVRO, AVRO_SAMPLE_DATA_FILE,
SCHEMA_NO_OUTGOING, null,
+ Sets.newHashSet("time_day", "outgoing")); RecordReader
csvRecordReader = RecordReaderFactory
+ .getRecordReader(FileFormat.CSV, CSV_SAMPLE_DATA_FILE,
SCHEMA_NO_OUTGOING, null,
+ Sets.newHashSet("time_day", "outgoing")); RecordReader
jsonRecordReader = RecordReaderFactory
+ .getRecordReader(FileFormat.JSON, JSON_SAMPLE_DATA_FILE,
SCHEMA_NO_OUTGOING, null,
+ Sets.newHashSet("time_day", "outgoing"))) {
int numRecords = 0;
while (avroRecordReader.hasNext()) {
assertTrue(csvRecordReader.hasNext());
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplNullValueVectorTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplNullValueVectorTest.java
index 7719eb8..264d1f1 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplNullValueVectorTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplNullValueVectorTest.java
@@ -56,7 +56,8 @@ public class MutableSegmentImplNullValueVectorTest {
.createMutableSegmentImpl(_schema, Collections.emptySet(),
Collections.emptySet(), Collections.emptySet(),
false, true);
GenericRow reuse = new GenericRow();
- try (RecordReader recordReader =
RecordReaderFactory.getRecordReader(FileFormat.JSON, jsonFile, _schema, null)) {
+ try (RecordReader recordReader = RecordReaderFactory
+ .getRecordReader(FileFormat.JSON, jsonFile, _schema, null,
_schema.getColumnNames())) {
while (recordReader.hasNext()) {
recordReader.next(reuse);
GenericRow transformedRow = _recordTransformer.transform(reuse);
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplTest.java
index 49a53b2..b3ba6eb 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplTest.java
@@ -86,7 +86,8 @@ public class MutableSegmentImplTest {
StreamMessageMetadata defaultMetadata = new
StreamMessageMetadata(_lastIngestionTimeMs);
_startTimeMs = System.currentTimeMillis();
- try (RecordReader recordReader =
RecordReaderFactory.getRecordReader(FileFormat.AVRO, avroFile, _schema, null)) {
+ try (RecordReader recordReader = RecordReaderFactory
+ .getRecordReader(FileFormat.AVRO, avroFile, _schema, null,
_schema.getColumnNames())) {
GenericRow reuse = new GenericRow();
while (recordReader.hasNext()) {
_mutableSegmentImpl.index(recordReader.next(reuse), defaultMetadata);
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
index 8a21772..a573d58 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConsumerFactory.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.core.realtime.impl.fakestream;
+import java.util.Set;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.stream.MessageBatch;
@@ -30,6 +31,7 @@ import org.apache.pinot.spi.stream.StreamDecoderProvider;
import org.apache.pinot.spi.stream.StreamLevelConsumer;
import org.apache.pinot.spi.stream.StreamMessageDecoder;
import org.apache.pinot.spi.stream.StreamMetadataProvider;
+import org.apache.pinot.spi.utils.SchemaFieldExtractorUtils;
/**
@@ -47,7 +49,7 @@ public class FakeStreamConsumerFactory extends
StreamConsumerFactory {
@Override
public StreamLevelConsumer createStreamLevelConsumer(String clientId, String
tableName, Schema schema,
- String groupId) {
+ String groupId, Set<String> sourceFields) {
return new FakeStreamLevelConsumer();
}
@@ -91,7 +93,8 @@ public class FakeStreamConsumerFactory extends
StreamConsumerFactory {
// Message decoder
Schema pinotSchema = FakeStreamConfigUtils.getPinotSchema();
- StreamMessageDecoder streamMessageDecoder =
StreamDecoderProvider.create(streamConfig, pinotSchema);
+ StreamMessageDecoder streamMessageDecoder =
StreamDecoderProvider.create(streamConfig, pinotSchema,
+ SchemaFieldExtractorUtils.extractSourceFields(pinotSchema));
GenericRow decodedRow = new GenericRow();
streamMessageDecoder.decode(messageBatch.getMessageAtIndex(0), decodedRow);
System.out.println(decodedRow);
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMessageDecoder.java
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMessageDecoder.java
index 93bff70..c823cc6 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMessageDecoder.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMessageDecoder.java
@@ -19,6 +19,7 @@
package org.apache.pinot.core.realtime.impl.fakestream;
import java.util.Map;
+import java.util.Set;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.stream.StreamMessageDecoder;
@@ -29,7 +30,7 @@ import org.apache.pinot.spi.stream.StreamMessageDecoder;
*/
public class FakeStreamMessageDecoder implements StreamMessageDecoder<byte[]> {
@Override
- public void init(Map<String, String> props, Schema indexingSchema, String
topicName) throws Exception {
+ public void init(Map<String, String> props, Schema indexingSchema, String
topicName, Set<String> sourceFields) throws Exception {
}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
index 2db50c6..1d84a67 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
@@ -70,7 +70,6 @@ import org.apache.pinot.spi.config.table.TableTaskConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.config.table.TenantConfig;
import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.utils.SchemaFieldExtractorUtils;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.RecordExtractor;
import org.apache.pinot.spi.stream.StreamConfig;
@@ -353,13 +352,12 @@ public abstract class ClusterTest extends ControllerTest {
private DatumReader<GenericData.Record> _reader;
@Override
- public void init(Map<String, String> props, Schema indexingSchema, String
topicName)
+ public void init(Map<String, String> props, Schema indexingSchema, String
topicName, Set<String> sourceFields)
throws Exception {
// Load Avro schema
DataFileStream<GenericRecord> reader = AvroUtils.getAvroReader(avroFile);
_avroSchema = reader.getSchema();
reader.close();
- Set<String> sourceFields =
SchemaFieldExtractorUtils.extract(indexingSchema);
_recordExtractor = new AvroRecordExtractor();
_recordExtractor.init(sourceFields, null);
_reader = new GenericDatumReader<>(_avroSchema);
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java
index 25ad714..73e07e7 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java
@@ -20,6 +20,7 @@ package org.apache.pinot.integration.tests;
import java.lang.reflect.Constructor;
import java.util.Random;
+import java.util.Set;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.stream.PartitionLevelConsumer;
@@ -53,12 +54,12 @@ public class FlakyConsumerRealtimeClusterIntegrationTest
extends RealtimeCluster
private Random _random = new Random();
public FlakyStreamLevelConsumer(String clientId, String tableName,
StreamConfig streamConfig, Schema schema,
- String groupId) {
+ String groupId, Set<String> sourceFields) {
try {
final Constructor constructor =
Class.forName(KafkaStarterUtils.KAFKA_STREAM_LEVEL_CONSUMER_CLASS_NAME)
- .getConstructor(String.class, String.class, StreamConfig.class,
Schema.class, String.class);
+ .getConstructor(String.class, String.class, StreamConfig.class,
Schema.class, String.class, Set.class);
_streamLevelConsumer = (StreamLevelConsumer) constructor
- .newInstance(clientId, tableName, streamConfig, schema, groupId);
+ .newInstance(clientId, tableName, streamConfig, schema, groupId,
sourceFields);
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -111,8 +112,8 @@ public class FlakyConsumerRealtimeClusterIntegrationTest
extends RealtimeCluster
@Override
public StreamLevelConsumer createStreamLevelConsumer(String clientId,
String tableName, Schema schema,
- String groupId) {
- return new FlakyStreamLevelConsumer(clientId, tableName, _streamConfig,
schema, groupId);
+ String groupId, Set<String> sourceFields) {
+ return new FlakyStreamLevelConsumer(clientId, tableName, _streamConfig,
schema, groupId, sourceFields);
}
@Override
diff --git
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationTaskRunner.java
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationTaskRunner.java
index 91f1028..353450e 100644
---
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationTaskRunner.java
+++
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationTaskRunner.java
@@ -81,12 +81,6 @@ public class SegmentGenerationTaskRunner implements
Serializable {
recordReaderConfig = (RecordReaderConfig)
JsonUtils.jsonNodeToObject(jsonNode, clazz);
}
- //init record reader
- String readerClassName = _taskSpec.getRecordReaderSpec().getClassName();
- RecordReader recordReader =
PluginManager.get().createInstance(readerClassName);
-
- recordReader.init(new File(_taskSpec.getInputFilePath()), schema,
recordReaderConfig);
-
//init segmentName Generator
SegmentNameGenerator segmentNameGenerator = getSegmentNameGerator();
@@ -96,10 +90,13 @@ public class SegmentGenerationTaskRunner implements
Serializable {
segmentGeneratorConfig.setOutDir(_taskSpec.getOutputDirectoryPath());
segmentGeneratorConfig.setSegmentNameGenerator(segmentNameGenerator);
segmentGeneratorConfig.setSequenceId(_taskSpec.getSequenceId());
+ segmentGeneratorConfig.setReaderConfig(recordReaderConfig);
+
segmentGeneratorConfig.setRecordReaderPath(_taskSpec.getRecordReaderSpec().getClassName());
+ segmentGeneratorConfig.setInputFilePath(_taskSpec.getInputFilePath());
//build segment
SegmentIndexCreationDriverImpl segmentIndexCreationDriver = new
SegmentIndexCreationDriverImpl();
- segmentIndexCreationDriver.init(segmentGeneratorConfig, recordReader);
+ segmentIndexCreationDriver.init(segmentGeneratorConfig);
segmentIndexCreationDriver.build();
return segmentIndexCreationDriver.getSegmentName();
}
diff --git
a/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordReader.java
b/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordReader.java
index 61b7245..1a07dd6 100644
---
a/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordReader.java
+++
b/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordReader.java
@@ -28,7 +28,6 @@ import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.RecordReader;
import org.apache.pinot.spi.data.readers.RecordReaderConfig;
-import org.apache.pinot.spi.utils.SchemaFieldExtractorUtils;
/**
@@ -45,7 +44,7 @@ public class AvroRecordReader implements RecordReader {
}
@Override
- public void init(File dataFile, Schema schema, @Nullable RecordReaderConfig
recordReaderConfig)
+ public void init(File dataFile, Schema schema, @Nullable RecordReaderConfig
recordReaderConfig, Set<String> sourceFields)
throws IOException {
_dataFile = dataFile;
_schema = schema;
@@ -56,7 +55,6 @@ public class AvroRecordReader implements RecordReader {
_avroReader.close();
throw e;
}
- Set<String> sourceFields = SchemaFieldExtractorUtils.extract(schema);
_recordExtractor = new AvroRecordExtractor();
_recordExtractor.init(sourceFields, null);
}
diff --git
a/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractorMapTypeTest.java
b/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractorMapTypeTest.java
index 0d1250d..7fed72d 100644
---
a/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractorMapTypeTest.java
+++
b/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractorMapTypeTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.plugin.inputformat.avro;
+import com.google.common.collect.Sets;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
@@ -26,6 +27,7 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
@@ -78,6 +80,11 @@ public class AvroRecordExtractorMapTypeTest extends
AbstractRecordExtractorTest
return org.apache.pinot.spi.data.Schema.fromInputSteam(schemaInputStream);
}
+ @Override
+ protected Set<String> getSourceFields() {
+ return Sets.newHashSet("map1", "map2");
+ }
+
/**
* Create an AvroRecordReader
*/
@@ -85,7 +92,7 @@ public class AvroRecordExtractorMapTypeTest extends
AbstractRecordExtractorTest
protected RecordReader createRecordReader()
throws IOException {
AvroRecordReader avroRecordReader = new AvroRecordReader();
- avroRecordReader.init(_dataFile, _pinotSchema, null);
+ avroRecordReader.init(_dataFile, _pinotSchema, null, _sourceFieldNames);
return avroRecordReader;
}
diff --git
a/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractorTest.java
b/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractorTest.java
index 568ec86..30b7531 100644
---
a/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractorTest.java
+++
b/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordExtractorTest.java
@@ -48,7 +48,7 @@ public class AvroRecordExtractorTest extends
AbstractRecordExtractorTest {
protected RecordReader createRecordReader()
throws IOException {
AvroRecordReader avroRecordReader = new AvroRecordReader();
- avroRecordReader.init(_dataFile, _pinotSchema, null);
+ avroRecordReader.init(_dataFile, _pinotSchema, null, _sourceFieldNames);
return avroRecordReader;
}
diff --git
a/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordReaderTest.java
b/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordReaderTest.java
index 91c1daf..1ae0356 100644
---
a/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordReaderTest.java
+++
b/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordReaderTest.java
@@ -39,7 +39,7 @@ public class AvroRecordReaderTest extends
AbstractRecordReaderTest {
protected RecordReader createRecordReader()
throws Exception {
AvroRecordReader avroRecordReader = new AvroRecordReader();
- avroRecordReader.init(_dataFile, getPinotSchema(), null);
+ avroRecordReader.init(_dataFile, getPinotSchema(), null, _sourceFields);
return avroRecordReader;
}
diff --git
a/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordToPinotRowGeneratorTest.java
b/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordToPinotRowGeneratorTest.java
index 8e43788..8a43f61 100644
---
a/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordToPinotRowGeneratorTest.java
+++
b/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroRecordToPinotRowGeneratorTest.java
@@ -47,7 +47,7 @@ public class AvroRecordToPinotRowGeneratorTest {
new
org.apache.pinot.spi.data.Schema.SchemaBuilder().setSchemaName("testSchema")
.addTime("incomingTime", TimeUnit.MILLISECONDS,
FieldSpec.DataType.LONG, "outgoingTime", TimeUnit.DAYS,
FieldSpec.DataType.INT).build();
- Set<String> sourceFields = SchemaFieldExtractorUtils.extract(pinotSchema);
+ Set<String> sourceFields =
SchemaFieldExtractorUtils.extractSourceFields(pinotSchema);
AvroRecordExtractor avroRecordExtractor = new AvroRecordExtractor();
avroRecordExtractor.init(sourceFields, null);
diff --git
a/pinot-plugins/pinot-input-format/pinot-avro/src/main/java/org/apache/pinot/plugin/inputformat/avro/KafkaAvroMessageDecoder.java
b/pinot-plugins/pinot-input-format/pinot-avro/src/main/java/org/apache/pinot/plugin/inputformat/avro/KafkaAvroMessageDecoder.java
index d433bf9..67e8b27 100644
---
a/pinot-plugins/pinot-input-format/pinot-avro/src/main/java/org/apache/pinot/plugin/inputformat/avro/KafkaAvroMessageDecoder.java
+++
b/pinot-plugins/pinot-input-format/pinot-avro/src/main/java/org/apache/pinot/plugin/inputformat/avro/KafkaAvroMessageDecoder.java
@@ -41,7 +41,6 @@ import org.apache.avro.io.DecoderFactory;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.RecordExtractor;
-import org.apache.pinot.spi.utils.SchemaFieldExtractorUtils;
import org.apache.pinot.spi.plugin.PluginManager;
import org.apache.pinot.spi.stream.StreamMessageDecoder;
import org.apache.pinot.spi.utils.retry.RetryPolicies;
@@ -49,6 +48,10 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * An implementation of StreamMessageDecoder to read avro from a Kafka stream
+ * NOTE: Do not use schema in the implementation, as schema will be removed
from the params
+ */
@NotThreadSafe
public class KafkaAvroMessageDecoder implements StreamMessageDecoder<byte[]> {
private static final Logger LOGGER =
LoggerFactory.getLogger(KafkaAvroMessageDecoder.class);
@@ -82,7 +85,7 @@ public class KafkaAvroMessageDecoder implements
StreamMessageDecoder<byte[]> {
private String[] _schemaRegistryUrls;
@Override
- public void init(Map<String, String> props, Schema indexingSchema, String
topicName)
+ public void init(Map<String, String> props, Schema indexingSchema, String
topicName, Set<String> sourceFields)
throws Exception {
_schemaRegistryUrls =
parseSchemaRegistryUrls(props.get(SCHEMA_REGISTRY_REST_URL));
@@ -106,7 +109,6 @@ public class KafkaAvroMessageDecoder implements
StreamMessageDecoder<byte[]> {
LOGGER.info("Populated schema cache with schema for {}", hashKey);
}
}
- Set<String> sourceFields =
SchemaFieldExtractorUtils.extract(indexingSchema);
String recordExtractorClass = props.get(RECORD_EXTRACTOR_CONFIG_KEY);
// Backward compatibility to support Avro by default
if (recordExtractorClass == null) {
diff --git
a/pinot-plugins/pinot-input-format/pinot-avro/src/main/java/org/apache/pinot/plugin/inputformat/avro/SimpleAvroMessageDecoder.java
b/pinot-plugins/pinot-input-format/pinot-avro/src/main/java/org/apache/pinot/plugin/inputformat/avro/SimpleAvroMessageDecoder.java
index 4da8843..bb2cf9c 100644
---
a/pinot-plugins/pinot-input-format/pinot-avro/src/main/java/org/apache/pinot/plugin/inputformat/avro/SimpleAvroMessageDecoder.java
+++
b/pinot-plugins/pinot-input-format/pinot-avro/src/main/java/org/apache/pinot/plugin/inputformat/avro/SimpleAvroMessageDecoder.java
@@ -33,11 +33,14 @@ import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.RecordExtractor;
import org.apache.pinot.spi.plugin.PluginManager;
import org.apache.pinot.spi.stream.StreamMessageDecoder;
-import org.apache.pinot.spi.utils.SchemaFieldExtractorUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * An implementation of StreamMessageDecoder to read simple avro records from
stream
+ * NOTE: Do not use schema in the implementation, as schema will be removed
from the params
+ */
@NotThreadSafe
public class SimpleAvroMessageDecoder implements StreamMessageDecoder<byte[]> {
private static final Logger LOGGER =
LoggerFactory.getLogger(SimpleAvroMessageDecoder.class);
@@ -51,10 +54,9 @@ public class SimpleAvroMessageDecoder implements
StreamMessageDecoder<byte[]> {
private GenericData.Record _avroRecordToReuse;
@Override
- public void init(Map<String, String> props, Schema indexingSchema, String
topicName)
+ public void init(Map<String, String> props, Schema indexingSchema, String
topicName, Set<String> sourceFields)
throws Exception {
Preconditions.checkState(props.containsKey(SCHEMA), "Avro schema must be
provided");
- Set<String> sourceFields =
SchemaFieldExtractorUtils.extract(indexingSchema);
_avroSchema = new org.apache.avro.Schema.Parser().parse(props.get(SCHEMA));
_datumReader = new GenericDatumReader<>(_avroSchema);
String recordExtractorClass = props.get(RECORD_EXTRACTOR_CONFIG_KEY);
diff --git
a/pinot-plugins/pinot-input-format/pinot-confluent-avro/src/main/java/org/apache/pinot/plugin/inputformat/avro/confluent/KafkaConfluentSchemaRegistryAvroMessageDecoder.java
b/pinot-plugins/pinot-input-format/pinot-confluent-avro/src/main/java/org/apache/pinot/plugin/inputformat/avro/confluent/KafkaConfluentSchemaRegistryAvroMessageDecoder.java
index 2aa966b..849eb49 100644
---
a/pinot-plugins/pinot-input-format/pinot-confluent-avro/src/main/java/org/apache/pinot/plugin/inputformat/avro/confluent/KafkaConfluentSchemaRegistryAvroMessageDecoder.java
+++
b/pinot-plugins/pinot-input-format/pinot-confluent-avro/src/main/java/org/apache/pinot/plugin/inputformat/avro/confluent/KafkaConfluentSchemaRegistryAvroMessageDecoder.java
@@ -32,13 +32,13 @@ import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.RecordExtractor;
import org.apache.pinot.spi.plugin.PluginManager;
import org.apache.pinot.spi.stream.StreamMessageDecoder;
-import org.apache.pinot.spi.utils.SchemaFieldExtractorUtils;
import static com.google.common.base.Preconditions.checkState;
/**
* Decodes avro messages with confluent schema registry.
* First byte is MAGIC = 0, second 4 bytes are the schema id, the remainder is
the value.
+ * NOTE: Do not use schema in the implementation, as schema will be removed
from the params
*/
public class KafkaConfluentSchemaRegistryAvroMessageDecoder implements
StreamMessageDecoder<byte[]> {
private static final String SCHEMA_REGISTRY_REST_URL =
"schema.registry.rest.url";
@@ -47,11 +47,9 @@ public class KafkaConfluentSchemaRegistryAvroMessageDecoder
implements StreamMes
private String _topicName;
@Override
- public void init(Map<String, String> props, Schema indexingSchema, String
topicName) throws Exception {
+ public void init(Map<String, String> props, Schema indexingSchema, String
topicName, Set<String> sourceFields) throws Exception {
checkState(props.containsKey(SCHEMA_REGISTRY_REST_URL), "Missing
required property '%s'", SCHEMA_REGISTRY_REST_URL);
String schemaRegistryUrl = props.get(SCHEMA_REGISTRY_REST_URL);
- Preconditions.checkNotNull(indexingSchema, "Schema must be provided");
- Set<String> sourceFields =
SchemaFieldExtractorUtils.extract(indexingSchema);
SchemaRegistryClient schemaRegistryClient = new
CachedSchemaRegistryClient(schemaRegistryUrl, 1000);
_deserializer = new KafkaAvroDeserializer(schemaRegistryClient);
Preconditions.checkNotNull(topicName, "Topic must be provided");
diff --git
a/pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReader.java
b/pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReader.java
index 2ad6318..df6ce4f 100644
---
a/pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReader.java
+++
b/pinot-plugins/pinot-input-format/pinot-csv/src/main/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReader.java
@@ -32,7 +32,6 @@ import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.RecordReader;
import org.apache.pinot.spi.data.readers.RecordReaderConfig;
import org.apache.pinot.spi.data.readers.RecordReaderUtils;
-import org.apache.pinot.spi.utils.SchemaFieldExtractorUtils;
/**
@@ -52,7 +51,7 @@ public class CSVRecordReader implements RecordReader {
}
@Override
- public void init(File dataFile, Schema schema, @Nullable RecordReaderConfig
recordReaderConfig)
+ public void init(File dataFile, Schema schema, @Nullable RecordReaderConfig
recordReaderConfig, Set<String> sourceFields)
throws IOException {
_dataFile = dataFile;
_schema = schema;
@@ -95,7 +94,6 @@ public class CSVRecordReader implements RecordReader {
_format = format;
_multiValueDelimiter = config.getMultiValueDelimiter();
}
- Set<String> sourceFields = SchemaFieldExtractorUtils.extract(schema);
_recordExtractor = new CSVRecordExtractor();
CSVRecordExtractorConfig recordExtractorConfig = new
CSVRecordExtractorConfig();
recordExtractorConfig.setMultiValueDelimiter(_multiValueDelimiter);
diff --git
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordExtractorTest.java
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordExtractorTest.java
index d561f09..233e686 100644
---
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordExtractorTest.java
+++
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordExtractorTest.java
@@ -49,7 +49,7 @@ public class CSVRecordExtractorTest extends
AbstractRecordExtractorTest {
CSVRecordReaderConfig csvRecordReaderConfig = new CSVRecordReaderConfig();
csvRecordReaderConfig.setMultiValueDelimiter(CSV_MULTI_VALUE_DELIMITER);
CSVRecordReader csvRecordReader = new CSVRecordReader();
- csvRecordReader.init(_dataFile, _pinotSchema, csvRecordReaderConfig);
+ csvRecordReader.init(_dataFile, _pinotSchema, csvRecordReaderConfig,
_sourceFieldNames);
return csvRecordReader;
}
@@ -59,13 +59,14 @@ public class CSVRecordExtractorTest extends
AbstractRecordExtractorTest {
@Override
public void createInputFile()
throws IOException {
+ String[] header = _sourceFieldNames.toArray(new String[0]);
try (FileWriter fileWriter = new FileWriter(_dataFile); CSVPrinter
csvPrinter = new CSVPrinter(fileWriter,
- CSVFormat.DEFAULT.withHeader(_sourceFieldNames.toArray(new
String[0])))) {
+ CSVFormat.DEFAULT.withHeader(header))) {
for (Map<String, Object> inputRecord : _inputRecords) {
- Object[] record = new Object[_sourceFieldNames.size()];
- for (int i = 0; i < _sourceFieldNames.size(); i++) {
- Object value = inputRecord.get(_sourceFieldNames.get(i));
+ Object[] record = new Object[header.length];
+ for (int i = 0; i < header.length; i++) {
+ Object value = inputRecord.get(header[i]);
if (value instanceof Collection) {
record[i] = StringUtils.join(((List) value).toArray(),
CSV_MULTI_VALUE_DELIMITER);
} else {
diff --git
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReaderTest.java
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReaderTest.java
index cef8d71..58bcb16 100644
---
a/pinot-plugins/pinot-input-format/pinot-csv/src/test/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReaderTest.java
+++
b/pinot-plugins/pinot-input-format/pinot-csv/src/test/java/org/apache/pinot/plugin/inputformat/csv/CSVRecordReaderTest.java
@@ -43,7 +43,7 @@ public class CSVRecordReaderTest extends
AbstractRecordReaderTest {
CSVRecordReaderConfig csvRecordReaderConfig = new CSVRecordReaderConfig();
csvRecordReaderConfig.setMultiValueDelimiter(CSV_MULTI_VALUE_DELIMITER);
CSVRecordReader csvRecordReader = new CSVRecordReader();
- csvRecordReader.init(_dataFile, getPinotSchema(), csvRecordReaderConfig);
+ csvRecordReader.init(_dataFile, getPinotSchema(), csvRecordReaderConfig,
_sourceFields);
return csvRecordReader;
}
diff --git
a/pinot-plugins/pinot-input-format/pinot-json/src/main/java/org/apache/pinot/plugin/inputformat/json/JSONRecordReader.java
b/pinot-plugins/pinot-input-format/pinot-json/src/main/java/org/apache/pinot/plugin/inputformat/json/JSONRecordReader.java
index 4636c86..c9a2fb4 100644
---
a/pinot-plugins/pinot-input-format/pinot-json/src/main/java/org/apache/pinot/plugin/inputformat/json/JSONRecordReader.java
+++
b/pinot-plugins/pinot-input-format/pinot-json/src/main/java/org/apache/pinot/plugin/inputformat/json/JSONRecordReader.java
@@ -30,7 +30,6 @@ import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.RecordReader;
import org.apache.pinot.spi.data.readers.RecordReaderConfig;
import org.apache.pinot.spi.utils.JsonUtils;
-import org.apache.pinot.spi.utils.SchemaFieldExtractorUtils;
/**
@@ -60,11 +59,10 @@ public class JSONRecordReader implements RecordReader {
}
@Override
- public void init(File dataFile, Schema schema, @Nullable RecordReaderConfig
recordReaderConfig)
+ public void init(File dataFile, Schema schema, @Nullable RecordReaderConfig
recordReaderConfig, Set<String> sourceFields)
throws IOException {
_dataFile = dataFile;
_schema = schema;
- Set<String> sourceFields = SchemaFieldExtractorUtils.extract(schema);
_recordExtractor = new JSONRecordExtractor();
_recordExtractor.init(sourceFields, null);
init();
diff --git
a/pinot-plugins/pinot-input-format/pinot-json/src/test/java/org/apache/pinot/plugin/inputformat/json/JSONRecordExtractorTest.java
b/pinot-plugins/pinot-input-format/pinot-json/src/test/java/org/apache/pinot/plugin/inputformat/json/JSONRecordExtractorTest.java
index cfea8fd..9451425 100644
---
a/pinot-plugins/pinot-input-format/pinot-json/src/test/java/org/apache/pinot/plugin/inputformat/json/JSONRecordExtractorTest.java
+++
b/pinot-plugins/pinot-input-format/pinot-json/src/test/java/org/apache/pinot/plugin/inputformat/json/JSONRecordExtractorTest.java
@@ -42,7 +42,7 @@ public class JSONRecordExtractorTest extends
AbstractRecordExtractorTest {
protected RecordReader createRecordReader()
throws IOException {
JSONRecordReader recordReader = new JSONRecordReader();
- recordReader.init(_dataFile, _pinotSchema, null);
+ recordReader.init(_dataFile, _pinotSchema, null, _sourceFieldNames);
return recordReader;
}
diff --git
a/pinot-plugins/pinot-input-format/pinot-json/src/test/java/org/apache/pinot/plugin/inputformat/json/JSONRecordReaderTest.java
b/pinot-plugins/pinot-input-format/pinot-json/src/test/java/org/apache/pinot/plugin/inputformat/json/JSONRecordReaderTest.java
index f89a6ac..b4904e1 100644
---
a/pinot-plugins/pinot-input-format/pinot-json/src/test/java/org/apache/pinot/plugin/inputformat/json/JSONRecordReaderTest.java
+++
b/pinot-plugins/pinot-input-format/pinot-json/src/test/java/org/apache/pinot/plugin/inputformat/json/JSONRecordReaderTest.java
@@ -38,7 +38,7 @@ public class JSONRecordReaderTest extends
AbstractRecordReaderTest {
protected RecordReader createRecordReader()
throws Exception {
JSONRecordReader recordReader = new JSONRecordReader();
- recordReader.init(_dateFile, getPinotSchema(), null);
+ recordReader.init(_dateFile, getPinotSchema(), null, _sourceFields);
return recordReader;
}
diff --git
a/pinot-plugins/pinot-input-format/pinot-orc/src/main/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordReader.java
b/pinot-plugins/pinot-input-format/pinot-orc/src/main/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordReader.java
index 824a33b..2bf8193 100644
---
a/pinot-plugins/pinot-input-format/pinot-orc/src/main/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordReader.java
+++
b/pinot-plugins/pinot-input-format/pinot-orc/src/main/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordReader.java
@@ -45,7 +45,6 @@ import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.RecordReader;
import org.apache.pinot.spi.data.readers.RecordReaderConfig;
-import org.apache.pinot.spi.utils.SchemaFieldExtractorUtils;
import org.apache.pinot.spi.utils.StringUtils;
@@ -75,7 +74,7 @@ public class ORCRecordReader implements RecordReader {
private int _nextRowId;
@Override
- public void init(File dataFile, Schema schema, @Nullable RecordReaderConfig
recordReaderConfig)
+ public void init(File dataFile, Schema schema, @Nullable RecordReaderConfig
recordReaderConfig, Set<String> sourceFields)
throws IOException {
_schema = schema;
@@ -89,7 +88,6 @@ public class ORCRecordReader implements RecordReader {
_orcFieldTypes = orcSchema.getChildren();
// Only read the required fields
- Set<String> schemaFields = SchemaFieldExtractorUtils.extract(schema);
int numOrcFields = _orcFields.size();
_includeOrcFields = new boolean[numOrcFields];
// NOTE: Include for ORC reader uses field id as the index
@@ -97,7 +95,7 @@ public class ORCRecordReader implements RecordReader {
orcReaderInclude[orcSchema.getId()] = true;
for (int i = 0; i < numOrcFields; i++) {
String field = _orcFields.get(i);
- if (schemaFields.contains(field)) {
+ if (sourceFields.contains(field)) {
TypeDescription fieldType = _orcFieldTypes.get(i);
TypeDescription.Category category = fieldType.getCategory();
if (category == TypeDescription.Category.LIST) {
diff --git
a/pinot-plugins/pinot-input-format/pinot-orc/src/test/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordExtractorTest.java
b/pinot-plugins/pinot-input-format/pinot-orc/src/test/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordExtractorTest.java
index 8e891f3..7b4c7c5 100644
---
a/pinot-plugins/pinot-input-format/pinot-orc/src/test/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordExtractorTest.java
+++
b/pinot-plugins/pinot-input-format/pinot-orc/src/test/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordExtractorTest.java
@@ -50,7 +50,7 @@ public class ORCRecordExtractorTest extends
AbstractRecordExtractorTest {
protected RecordReader createRecordReader()
throws IOException {
ORCRecordReader orcRecordReader = new ORCRecordReader();
- orcRecordReader.init(_dataFile, _pinotSchema, null);
+ orcRecordReader.init(_dataFile, _pinotSchema, null, _sourceFieldNames);
return orcRecordReader;
}
diff --git
a/pinot-plugins/pinot-input-format/pinot-orc/src/test/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordReaderTest.java
b/pinot-plugins/pinot-input-format/pinot-orc/src/test/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordReaderTest.java
index 252874b..3cf9de2 100644
---
a/pinot-plugins/pinot-input-format/pinot-orc/src/test/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordReaderTest.java
+++
b/pinot-plugins/pinot-input-format/pinot-orc/src/test/java/org/apache/pinot/plugin/inputformat/orc/ORCRecordReaderTest.java
@@ -44,7 +44,7 @@ public class ORCRecordReaderTest extends
AbstractRecordReaderTest {
protected RecordReader createRecordReader()
throws Exception {
ORCRecordReader orcRecordReader = new ORCRecordReader();
- orcRecordReader.init(_dataFile, _pinotSchema, null);
+ orcRecordReader.init(_dataFile, _pinotSchema, null, _sourceFields);
return orcRecordReader;
}
diff --git
a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReader.java
b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReader.java
index d4ed56f..eb71d7e 100644
---
a/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReader.java
+++
b/pinot-plugins/pinot-input-format/pinot-parquet/src/main/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReader.java
@@ -29,7 +29,6 @@ import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.RecordReader;
import org.apache.pinot.spi.data.readers.RecordReaderConfig;
-import org.apache.pinot.spi.utils.SchemaFieldExtractorUtils;
/**
@@ -43,12 +42,11 @@ public class ParquetRecordReader implements RecordReader {
private GenericRecord _nextRecord;
@Override
- public void init(File dataFile, Schema schema, @Nullable RecordReaderConfig
recordReaderConfig)
+ public void init(File dataFile, Schema schema, @Nullable RecordReaderConfig
recordReaderConfig, Set<String> sourceFields)
throws IOException {
_dataFilePath = new Path(dataFile.getAbsolutePath());
_schema = schema;
ParquetUtils.validateSchema(_schema,
ParquetUtils.getParquetSchema(_dataFilePath));
- Set<String> sourceFields = SchemaFieldExtractorUtils.extract(schema);
_recordExtractor = new ParquetRecordExtractor();
_recordExtractor.init(sourceFields, null);
diff --git
a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordExtractorTest.java
b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordExtractorTest.java
index 976d935..95c6496 100644
---
a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordExtractorTest.java
+++
b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordExtractorTest.java
@@ -54,7 +54,7 @@ public class ParquetRecordExtractorTest extends
AbstractRecordExtractorTest {
protected RecordReader createRecordReader()
throws IOException {
ParquetRecordReader recordReader = new ParquetRecordReader();
- recordReader.init(_dataFile, _pinotSchema, null);
+ recordReader.init(_dataFile, _pinotSchema, null, _sourceFieldNames);
return recordReader;
}
diff --git
a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderTest.java
b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderTest.java
index 4761e46..b577799 100644
---
a/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderTest.java
+++
b/pinot-plugins/pinot-input-format/pinot-parquet/src/test/java/org/apache/pinot/plugin/inputformat/parquet/ParquetRecordReaderTest.java
@@ -39,7 +39,7 @@ public class ParquetRecordReaderTest extends
AbstractRecordReaderTest {
protected RecordReader createRecordReader()
throws Exception {
ParquetRecordReader recordReader = new ParquetRecordReader();
- recordReader.init(_dataFile, getPinotSchema(), null);
+ recordReader.init(_dataFile, getPinotSchema(), null, _sourceFields);
return recordReader;
}
diff --git
a/pinot-plugins/pinot-input-format/pinot-thrift/src/main/java/org/apache/pinot/plugin/inputformat/thrift/ThriftRecordReader.java
b/pinot-plugins/pinot-input-format/pinot-thrift/src/main/java/org/apache/pinot/plugin/inputformat/thrift/ThriftRecordReader.java
index 42bcd34..5670b04 100644
---
a/pinot-plugins/pinot-input-format/pinot-thrift/src/main/java/org/apache/pinot/plugin/inputformat/thrift/ThriftRecordReader.java
+++
b/pinot-plugins/pinot-input-format/pinot-thrift/src/main/java/org/apache/pinot/plugin/inputformat/thrift/ThriftRecordReader.java
@@ -30,7 +30,6 @@ import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.RecordReader;
import org.apache.pinot.spi.data.readers.RecordReaderConfig;
import org.apache.pinot.spi.data.readers.RecordReaderUtils;
-import org.apache.pinot.spi.utils.SchemaFieldExtractorUtils;
import org.apache.thrift.TBase;
import org.apache.thrift.TFieldIdEnum;
import org.apache.thrift.protocol.TBinaryProtocol;
@@ -76,7 +75,7 @@ public class ThriftRecordReader implements RecordReader {
}
@Override
- public void init(File dataFile, Schema schema, @Nullable RecordReaderConfig
config)
+ public void init(File dataFile, Schema schema, @Nullable RecordReaderConfig
config, Set<String> sourceFields)
throws IOException {
ThriftRecordReaderConfig recordReaderConfig = (ThriftRecordReaderConfig)
config;
_dataFile = dataFile;
@@ -94,7 +93,6 @@ public class ThriftRecordReader implements RecordReader {
_fieldIds.put(tFieldIdEnum.getFieldName(), index);
index++;
}
- Set<String> sourceFields = SchemaFieldExtractorUtils.extract(schema);
ThriftRecordExtractorConfig recordExtractorConfig = new
ThriftRecordExtractorConfig();
recordExtractorConfig.setFieldIds(_fieldIds);
_recordExtractor = new ThriftRecordExtractor();
diff --git
a/pinot-plugins/pinot-input-format/pinot-thrift/src/test/java/org/apache/pinot/plugin/inputformat/thrift/ThriftRecordReaderTest.java
b/pinot-plugins/pinot-input-format/pinot-thrift/src/test/java/org/apache/pinot/plugin/inputformat/thrift/ThriftRecordReaderTest.java
index 899c54e..906c6b3 100644
---
a/pinot-plugins/pinot-input-format/pinot-thrift/src/test/java/org/apache/pinot/plugin/inputformat/thrift/ThriftRecordReaderTest.java
+++
b/pinot-plugins/pinot-input-format/pinot-thrift/src/test/java/org/apache/pinot/plugin/inputformat/thrift/ThriftRecordReaderTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.plugin.inputformat.thrift;
+import com.google.common.collect.Sets;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
@@ -95,7 +96,7 @@ public class ThriftRecordReaderTest {
public void testReadData()
throws IOException {
ThriftRecordReader recordReader = new ThriftRecordReader();
- recordReader.init(_tempFile, getSchema(), getThriftRecordReaderConfig());
+ recordReader.init(_tempFile, getSchema(), getThriftRecordReaderConfig(),
getSourceFields());
List<GenericRow> genericRows = new ArrayList<>();
while (recordReader.hasNext()) {
genericRows.add(recordReader.next());
@@ -114,7 +115,7 @@ public class ThriftRecordReaderTest {
public void testRewind()
throws IOException {
ThriftRecordReader recordReader = new ThriftRecordReader();
- recordReader.init(_tempFile, getSchema(), getThriftRecordReaderConfig());
+ recordReader.init(_tempFile, getSchema(), getThriftRecordReaderConfig(),
getSourceFields());
List<GenericRow> genericRows = new ArrayList<>();
while (recordReader.hasNext()) {
genericRows.add(recordReader.next());
@@ -150,6 +151,10 @@ public class ThriftRecordReaderTest {
.addMultiValueDimension("set_values",
FieldSpec.DataType.STRING).build();
}
+ private Set<String> getSourceFields() {
+ return Sets.newHashSet("id", "name", "created_at", "active", "groups",
"set_values");
+ }
+
@AfterClass
public void tearDown() {
FileUtils.deleteQuietly(_tempFile);
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaConsumerFactory.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaConsumerFactory.java
index 18228ea..1145ba7 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaConsumerFactory.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaConsumerFactory.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.plugin.stream.kafka09;
+import java.util.Set;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.stream.PartitionLevelConsumer;
import org.apache.pinot.spi.stream.StreamConsumerFactory;
@@ -51,8 +52,8 @@ public class KafkaConsumerFactory extends
StreamConsumerFactory {
*/
@Override
public StreamLevelConsumer createStreamLevelConsumer(String clientId, String
tableName, Schema schema,
- String groupId) {
- return new KafkaStreamLevelConsumer(clientId, tableName, _streamConfig,
schema, groupId);
+ String groupId, Set<String> sourceFields) {
+ return new KafkaStreamLevelConsumer(clientId, tableName, _streamConfig,
schema, groupId, sourceFields);
}
/**
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaStreamLevelConsumer.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaStreamLevelConsumer.java
index c727d31..38dcef3 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaStreamLevelConsumer.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-0.9/src/main/java/org/apache/pinot/plugin/stream/kafka09/KafkaStreamLevelConsumer.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.plugin.stream.kafka09;
+import java.util.Set;
import kafka.consumer.ConsumerIterator;
import kafka.javaapi.consumer.ConsumerConnector;
import org.apache.pinot.spi.stream.StreamConfig;
@@ -52,12 +53,12 @@ public class KafkaStreamLevelConsumer implements
StreamLevelConsumer {
private long currentCount = 0L;
public KafkaStreamLevelConsumer(String clientId, String tableName,
StreamConfig streamConfig, Schema schema,
- String groupId) {
+ String groupId, Set<String> sourceFields) {
_clientId = clientId;
_streamConfig = streamConfig;
_kafkaHighLevelStreamConfig = new KafkaHighLevelStreamConfig(streamConfig,
tableName, groupId);
- _messageDecoder = StreamDecoderProvider.create(streamConfig, schema);
+ _messageDecoder = StreamDecoderProvider.create(streamConfig, schema,
sourceFields);
_tableAndStreamName = tableName + "-" + streamConfig.getTopicName();
INSTANCE_LOGGER = LoggerFactory
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerFactory.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerFactory.java
index 444e8f7..8c5c793 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerFactory.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaConsumerFactory.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.plugin.stream.kafka20;
+import java.util.Set;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.stream.PartitionLevelConsumer;
import org.apache.pinot.spi.stream.StreamConsumerFactory;
@@ -33,8 +34,8 @@ public class KafkaConsumerFactory extends
StreamConsumerFactory {
@Override
public StreamLevelConsumer createStreamLevelConsumer(String clientId, String
tableName, Schema schema,
- String groupId) {
- return new KafkaStreamLevelConsumer(clientId, tableName, _streamConfig,
schema, groupId);
+ String groupId, Set<String> sourceFields) {
+ return new KafkaStreamLevelConsumer(clientId, tableName, _streamConfig,
schema, groupId, sourceFields);
}
@Override
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamLevelConsumer.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamLevelConsumer.java
index 60a04b7..e01ac94 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamLevelConsumer.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamLevelConsumer.java
@@ -22,6 +22,7 @@ import java.time.Duration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
+import java.util.Set;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -64,12 +65,12 @@ public class KafkaStreamLevelConsumer implements
StreamLevelConsumer {
public KafkaStreamLevelConsumer(String clientId, String tableName,
StreamConfig streamConfig, Schema schema,
- String groupId) {
+ String groupId, Set<String> sourceFields) {
_clientId = clientId;
_streamConfig = streamConfig;
_kafkaStreamLevelStreamConfig = new
KafkaStreamLevelStreamConfig(streamConfig, tableName, groupId);
- _messageDecoder = StreamDecoderProvider.create(streamConfig, schema);
+ _messageDecoder = StreamDecoderProvider.create(streamConfig, schema,
sourceFields);
_tableAndStreamName = tableName + "-" + streamConfig.getTopicName();
INSTANCE_LOGGER = LoggerFactory
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/main/java/org/apache/pinot/plugin/stream/kafka/KafkaJSONMessageDecoder.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/main/java/org/apache/pinot/plugin/stream/kafka/KafkaJSONMessageDecoder.java
index 5ee48c0..3355247 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/main/java/org/apache/pinot/plugin/stream/kafka/KafkaJSONMessageDecoder.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/main/java/org/apache/pinot/plugin/stream/kafka/KafkaJSONMessageDecoder.java
@@ -21,13 +21,10 @@ package org.apache.pinot.plugin.stream.kafka;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
-import java.util.ArrayList;
import java.util.Arrays;
-import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.utils.SchemaFieldExtractorUtils;
import org.apache.pinot.spi.data.readers.RecordExtractor;
import org.apache.pinot.spi.plugin.PluginManager;
import org.apache.pinot.spi.utils.JsonUtils;
@@ -37,6 +34,10 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * An implementation of StreamMessageDecoder to read JSON records from a stream
+ * NOTE: Do not use schema in the implementation, as schema will be removed
from the params
+ */
public class KafkaJSONMessageDecoder implements StreamMessageDecoder<byte[]> {
private static final Logger LOGGER =
LoggerFactory.getLogger(KafkaJSONMessageDecoder.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@@ -45,9 +46,8 @@ public class KafkaJSONMessageDecoder implements
StreamMessageDecoder<byte[]> {
private RecordExtractor<Map<String, Object>> _jsonRecordExtractor;
@Override
- public void init(Map<String, String> props, Schema indexingSchema, String
topicName)
+ public void init(Map<String, String> props, Schema indexingSchema, String
topicName, Set<String> sourceFields)
throws Exception {
- Set<String> sourceFields =
SchemaFieldExtractorUtils.extract(indexingSchema);
String recordExtractorClass = null;
if (props != null) {
recordExtractorClass = props.get(RECORD_EXTRACTOR_CONFIG_KEY);
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/test/java/org/apache/pinot/plugin/stream/kafka/KafkaJSONMessageDecoderTest.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/test/java/org/apache/pinot/plugin/stream/kafka/KafkaJSONMessageDecoderTest.java
index b1553d0..29fa10f 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/test/java/org/apache/pinot/plugin/stream/kafka/KafkaJSONMessageDecoderTest.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-base/src/test/java/org/apache/pinot/plugin/stream/kafka/KafkaJSONMessageDecoderTest.java
@@ -63,7 +63,7 @@ public class KafkaJSONMessageDecoderTest {
try (BufferedReader reader = new BufferedReader(
new
FileReader(getClass().getClassLoader().getResource("data/test_sample_data.json").getFile())))
{
KafkaJSONMessageDecoder decoder = new KafkaJSONMessageDecoder();
- decoder.init(new HashMap<>(), schema, "testTopic");
+ decoder.init(new HashMap<>(), schema, "testTopic",
schema.getColumnNames());
GenericRow r = new GenericRow();
String line = reader.readLine();
while (line != null) {
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/function/evaluators/DefaultTimeSpecEvaluator.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/function/evaluators/DefaultTimeSpecEvaluator.java
index 77eaec5..dcb242d 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/function/evaluators/DefaultTimeSpecEvaluator.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/function/evaluators/DefaultTimeSpecEvaluator.java
@@ -19,7 +19,6 @@
package org.apache.pinot.spi.data.function.evaluators;
import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
import java.util.Collections;
import java.util.List;
import org.apache.pinot.spi.data.TimeFieldSpec;
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReader.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReader.java
index 4af4687..23be865 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReader.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReader.java
@@ -21,6 +21,7 @@ package org.apache.pinot.spi.data.readers;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
+import java.util.Set;
import javax.annotation.Nullable;
import org.apache.pinot.spi.data.Schema;
@@ -38,11 +39,12 @@ public interface RecordReader extends Closeable {
* @param dataFile Data file
* @param schema Pinot Schema associated with the table
* @param recordReaderConfig Config for the reader specific to the format.
e.g. delimiter for csv format etc
+ * @param sourceFields The fields to extract from the source record
* @throws IOException If an I/O error occurs
*
- * TODO: decouple Schema and RecordReader
+ * TODO: decouple Schema and RecordReader. This should be easier, now that
we have field names to extract
*/
- void init(File dataFile, Schema schema, @Nullable RecordReaderConfig
recordReaderConfig)
+ void init(File dataFile, Schema schema, @Nullable RecordReaderConfig
recordReaderConfig, Set<String> sourceFields)
throws IOException;
/**
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderFactory.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderFactory.java
index e7cb6d9..8817852 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderFactory.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/RecordReaderFactory.java
@@ -22,6 +22,7 @@ import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
+import java.util.Set;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.plugin.PluginManager;
import org.apache.pinot.spi.utils.JsonUtils;
@@ -120,10 +121,10 @@ public class RecordReaderFactory {
* @throws Exception
*/
public static RecordReader getRecordReaderByClass(String
recordReaderClassName, File dataFile, Schema schema,
- RecordReaderConfig recordReaderConfig)
+ RecordReaderConfig recordReaderConfig, Set<String> fields)
throws Exception {
RecordReader recordReader =
PluginManager.get().createInstance(recordReaderClassName);
- recordReader.init(dataFile, schema, recordReaderConfig);
+ recordReader.init(dataFile, schema, recordReaderConfig, fields);
return recordReader;
}
@@ -138,9 +139,9 @@ public class RecordReaderFactory {
* @throws Exception Any exception while initializing the RecordReader
*/
public static RecordReader getRecordReader(FileFormat fileFormat, File
dataFile, Schema schema,
- RecordReaderConfig recordReaderConfig)
+ RecordReaderConfig recordReaderConfig, Set<String> fields)
throws Exception {
- return getRecordReader(fileFormat.name(), dataFile, schema,
recordReaderConfig);
+ return getRecordReader(fileFormat.name(), dataFile, schema,
recordReaderConfig, fields);
}
/**
@@ -154,12 +155,12 @@ public class RecordReaderFactory {
* @throws Exception Any exception while initializing the RecordReader
*/
public static RecordReader getRecordReader(String fileFormatStr, File
dataFile, Schema schema,
- RecordReaderConfig recordReaderConfig)
+ RecordReaderConfig recordReaderConfig, Set<String> fields)
throws Exception {
String fileFormatKey = fileFormatStr.toUpperCase();
if (DEFAULT_RECORD_READER_CLASS_MAP.containsKey(fileFormatKey)) {
return
getRecordReaderByClass(DEFAULT_RECORD_READER_CLASS_MAP.get(fileFormatKey),
dataFile, schema,
- recordReaderConfig);
+ recordReaderConfig, fields);
}
throw new UnsupportedOperationException(
"No supported RecordReader found for file format - '" + fileFormatStr
+ "'");
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java
index 60b1da2..caa5d6b 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.spi.stream;
+import java.util.Set;
import org.apache.pinot.spi.data.Schema;
@@ -49,10 +50,11 @@ public abstract class StreamConsumerFactory {
* @param tableName the table name for the topic of this consumer
* @param schema the pinot schema of the event being consumed
* @param groupId consumer group Id
- * @return
+ * @param sourceFields the fields to extract from the source stream
+ * @return the stream level consumer
*/
public abstract StreamLevelConsumer createStreamLevelConsumer(String
clientId, String tableName, Schema schema,
- String groupId);
+ String groupId, Set<String> sourceFields);
/**
* Creates a metadata provider which provides partition specific metadata
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDecoderProvider.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDecoderProvider.java
index e4cac8a..852be98 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDecoderProvider.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamDecoderProvider.java
@@ -19,6 +19,7 @@
package org.apache.pinot.spi.stream;
import java.util.Map;
+import java.util.Set;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.plugin.PluginManager;
@@ -31,17 +32,18 @@ public abstract class StreamDecoderProvider {
/**
* Constructs a {@link StreamMessageDecoder} using properties in {@link
StreamConfig} and initializes it
- * @param streamConfig
- * @param schema
- * @return
+ * @param streamConfig the stream configs from the table config
+ * @param schema the schema of the Pinot table
+ * @param sourceFields the fields to extract from the source stream
+ * @return the StreamMessageDecoder
*/
- public static StreamMessageDecoder create(StreamConfig streamConfig, Schema
schema) {
+ public static StreamMessageDecoder create(StreamConfig streamConfig, Schema
schema, Set<String> sourceFields) {
StreamMessageDecoder decoder = null;
String decoderClass = streamConfig.getDecoderClass();
Map<String, String> decoderProperties =
streamConfig.getDecoderProperties();
try {
decoder = PluginManager.get().createInstance(decoderClass);
- decoder.init(decoderProperties, schema, streamConfig.getTopicName());
+ decoder.init(decoderProperties, schema, streamConfig.getTopicName(),
sourceFields);
} catch (Exception e) {
ExceptionUtils.rethrow(e);
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageDecoder.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageDecoder.java
index 05a78ef..c3a7347 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageDecoder.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMessageDecoder.java
@@ -19,6 +19,7 @@
package org.apache.pinot.spi.stream;
import java.util.Map;
+import java.util.Set;
import org.apache.pinot.spi.annotations.InterfaceAudience;
import org.apache.pinot.spi.annotations.InterfaceStability;
import org.apache.pinot.spi.data.Schema;
@@ -33,14 +34,17 @@ import org.apache.pinot.spi.data.readers.GenericRow;
@InterfaceStability.Stable
public interface StreamMessageDecoder<T> {
- static final String RECORD_EXTRACTOR_CONFIG_KEY = "recordExtractorClass";
+ String RECORD_EXTRACTOR_CONFIG_KEY = "recordExtractorClass";
/**
- * Initialize the decoder with decoder properties map, the stream topic name
and stream schema
- * @param props
+ * Initialize the decoder
+ * @param props decoder properties extracted from the {@link StreamConfig}
+ * @param indexingSchema the Pinot schema TODO: Remove Schema from
StreamMessageDecoder. Do not use inside the implementation, as this will be
removed
+ * @param topicName topic name of the stream
+ * @param sourceFields the fields to be read from the source stream's record
* @throws Exception
*/
- void init(Map<String, String> props, Schema indexingSchema, String topicName)
+ void init(Map<String, String> props, Schema indexingSchema, String
topicName, Set<String> sourceFields)
throws Exception;
/**
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/SchemaFieldExtractorUtils.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/SchemaFieldExtractorUtils.java
index 1a53756..0c34046 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/SchemaFieldExtractorUtils.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/SchemaFieldExtractorUtils.java
@@ -19,17 +19,15 @@
package org.apache.pinot.spi.utils;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import org.apache.pinot.spi.data.function.evaluators.ExpressionEvaluator;
+import
org.apache.pinot.spi.data.function.evaluators.ExpressionEvaluatorFactory;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.TimeFieldSpec;
import org.apache.pinot.spi.data.TimeGranularitySpec;
-import org.apache.pinot.spi.data.function.evaluators.ExpressionEvaluator;
-import
org.apache.pinot.spi.data.function.evaluators.ExpressionEvaluatorFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,7 +48,7 @@ public class SchemaFieldExtractorUtils {
*
* TODO: for now, we assume that arguments to transform function are in the
source i.e. there's no columns which are derived from transformed columns
*/
- public static Set<String> extract(Schema schema) {
+ public static Set<String> extractSourceFields(Schema schema) {
Set<String> sourceFieldNames = new HashSet<>();
for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
if (!fieldSpec.isVirtualColumn()) {
@@ -64,22 +62,6 @@ public class SchemaFieldExtractorUtils {
return sourceFieldNames;
}
- @VisibleForTesting
- public static Set<String> extractSource(Schema schema) {
- Set<String> sourceFieldNames = new HashSet<>();
- for (FieldSpec fieldSpec : schema.getAllFieldSpecs()) {
- if (!fieldSpec.isVirtualColumn()) {
- ExpressionEvaluator expressionEvaluator =
ExpressionEvaluatorFactory.getExpressionEvaluator(fieldSpec);
- if (expressionEvaluator != null) {
- sourceFieldNames.addAll(expressionEvaluator.getArguments());
- } else {
- sourceFieldNames.add(fieldSpec.getName());
- }
- }
- }
- return sourceFieldNames;
- }
-
/**
* Validates that for a field spec with transform function, the source
column name and destination column name are exclusive
* i.e. do not allow using source column name for destination column
diff --git
a/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/AbstractRecordExtractorTest.java
b/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/AbstractRecordExtractorTest.java
index 2c3b3f7..636262a 100644
---
a/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/AbstractRecordExtractorTest.java
+++
b/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/AbstractRecordExtractorTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.spi.data.readers;
+import com.google.common.collect.Sets;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
@@ -28,9 +29,9 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.utils.SchemaFieldExtractorUtils;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
@@ -43,7 +44,7 @@ import org.testng.annotations.Test;
public abstract class AbstractRecordExtractorTest {
protected Schema _pinotSchema;
- protected List<String> _sourceFieldNames;
+ protected Set<String> _sourceFieldNames;
protected List<Map<String, Object>> _inputRecords;
private RecordReader _recordReader;
protected final File _tempDir = new File(FileUtils.getTempDirectory(),
"RecordTransformationTest");
@@ -53,7 +54,7 @@ public abstract class AbstractRecordExtractorTest {
throws IOException {
FileUtils.forceMkdir(_tempDir);
_pinotSchema = getPinotSchema();
- _sourceFieldNames = new
ArrayList<>(SchemaFieldExtractorUtils.extractSource(_pinotSchema));
+ _sourceFieldNames = getSourceFields();
_inputRecords = getInputRecords();
createInputFile();
_recordReader = createRecordReader();
@@ -90,6 +91,10 @@ public abstract class AbstractRecordExtractorTest {
return inputRecords;
}
+ protected Set<String> getSourceFields() {
+ return Sets.newHashSet("user_id", "firstName", "lastName", "bids",
"campaignInfo", "cost", "timestamp");
+ }
+
@AfterClass
public void tearDown()
throws Exception {
diff --git
a/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/AbstractRecordReaderTest.java
b/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/AbstractRecordReaderTest.java
index 085d609..7303a73 100644
---
a/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/AbstractRecordReaderTest.java
+++
b/pinot-spi/src/test/java/org/apache/pinot/spi/data/readers/AbstractRecordReaderTest.java
@@ -18,12 +18,14 @@
*/
package org.apache.pinot.spi.data.readers;
+import com.google.common.collect.Sets;
import java.io.File;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
+import java.util.Set;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.pinot.spi.data.FieldSpec;
@@ -41,6 +43,7 @@ public abstract class AbstractRecordReaderTest {
protected final File _tempDir = new File(FileUtils.getTempDirectory(),
"RecordReaderTest");
protected List<Map<String, Object>> _records;
protected org.apache.pinot.spi.data.Schema _pinotSchema;
+ protected Set<String> _sourceFields;
private RecordReader _recordReader;
private static List<Map<String, Object>> generateRandomRecords(Schema
pinotSchema) {
@@ -123,12 +126,17 @@ public abstract class AbstractRecordReaderTest {
.addMetric("met_double", FieldSpec.DataType.DOUBLE).build();
}
+ protected Set<String> getSourceFields(Schema schema) {
+ return Sets.newHashSet(schema.getColumnNames());
+ }
+
@BeforeClass
public void setUp()
throws Exception {
FileUtils.forceMkdir(_tempDir);
// Generate Pinot schema
_pinotSchema = getPinotSchema();
+ _sourceFields = getSourceFields(_pinotSchema);
// Generate random records based on Pinot schema
_records = generateRandomRecords(_pinotSchema);
// Write generated random records to file
diff --git
a/pinot-spi/src/test/java/org/apache/pinot/spi/plugin/PluginManagerTest.java
b/pinot-spi/src/test/java/org/apache/pinot/spi/plugin/PluginManagerTest.java
index 0dfd4ee..23659cf 100644
--- a/pinot-spi/src/test/java/org/apache/pinot/spi/plugin/PluginManagerTest.java
+++ b/pinot-spi/src/test/java/org/apache/pinot/spi/plugin/PluginManagerTest.java
@@ -73,7 +73,7 @@ public class PluginManagerTest {
PluginManager.get().load("test-record-reader", jarDirFile);
RecordReader testRecordReader =
PluginManager.get().createInstance("test-record-reader", "TestRecordReader");
- testRecordReader.init(null, null, null);
+ testRecordReader.init(null, null, null, null);
int count = 0;
while (testRecordReader.hasNext()) {
GenericRow row = testRecordReader.next();
diff --git
a/pinot-spi/src/test/java/org/apache/pinot/spi/utils/SchemaFieldExtractorUtilsTest.java
b/pinot-spi/src/test/java/org/apache/pinot/spi/utils/SchemaFieldExtractorUtilsTest.java
index c8cd285..7c1f038 100644
---
a/pinot-spi/src/test/java/org/apache/pinot/spi/utils/SchemaFieldExtractorUtilsTest.java
+++
b/pinot-spi/src/test/java/org/apache/pinot/spi/utils/SchemaFieldExtractorUtilsTest.java
@@ -28,7 +28,6 @@ import org.apache.pinot.spi.data.MetricFieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.TimeFieldSpec;
import org.apache.pinot.spi.data.TimeGranularitySpec;
-import org.apache.pinot.spi.utils.SchemaFieldExtractorUtils;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -49,7 +48,7 @@ public class SchemaFieldExtractorUtilsTest {
dimensionFieldSpec.setTransformFunction("Groovy({function}, argument1,
argument2)");
schema.addField(dimensionFieldSpec);
- List<String> extract = new
ArrayList<>(SchemaFieldExtractorUtils.extract(schema));
+ List<String> extract = new
ArrayList<>(SchemaFieldExtractorUtils.extractSourceFields(schema));
Assert.assertEquals(extract.size(), 3);
Assert.assertTrue(extract.containsAll(Arrays.asList("d1", "argument1",
"argument2")));
@@ -59,7 +58,7 @@ public class SchemaFieldExtractorUtilsTest {
dimensionFieldSpec.setTransformFunction("Groovy({function})");
schema.addField(dimensionFieldSpec);
- extract = new ArrayList<>(SchemaFieldExtractorUtils.extract(schema));
+ extract = new
ArrayList<>(SchemaFieldExtractorUtils.extractSourceFields(schema));
Assert.assertEquals(extract.size(), 1);
Assert.assertTrue(extract.contains("d1"));
@@ -68,7 +67,7 @@ public class SchemaFieldExtractorUtilsTest {
dimensionFieldSpec = new DimensionFieldSpec("map__KEYS",
FieldSpec.DataType.INT, false);
schema.addField(dimensionFieldSpec);
- extract = new ArrayList<>(SchemaFieldExtractorUtils.extract(schema));
+ extract = new
ArrayList<>(SchemaFieldExtractorUtils.extractSourceFields(schema));
Assert.assertEquals(extract.size(), 2);
Assert.assertTrue(extract.containsAll(Arrays.asList("map", "map__KEYS")));
@@ -77,7 +76,7 @@ public class SchemaFieldExtractorUtilsTest {
dimensionFieldSpec = new DimensionFieldSpec("map__VALUES",
FieldSpec.DataType.LONG, false);
schema.addField(dimensionFieldSpec);
- extract = new ArrayList<>(SchemaFieldExtractorUtils.extract(schema));
+ extract = new
ArrayList<>(SchemaFieldExtractorUtils.extractSourceFields(schema));
Assert.assertEquals(extract.size(), 2);
Assert.assertTrue(extract.containsAll(Arrays.asList("map",
"map__VALUES")));
@@ -87,7 +86,7 @@ public class SchemaFieldExtractorUtilsTest {
TimeFieldSpec timeFieldSpec = new TimeFieldSpec("time",
FieldSpec.DataType.LONG, TimeUnit.MILLISECONDS);
schema.addField(timeFieldSpec);
- extract = new ArrayList<>(SchemaFieldExtractorUtils.extract(schema));
+ extract = new
ArrayList<>(SchemaFieldExtractorUtils.extractSourceFields(schema));
Assert.assertEquals(extract.size(), 1);
Assert.assertTrue(extract.contains("time"));
@@ -98,7 +97,7 @@ public class SchemaFieldExtractorUtilsTest {
TimeUnit.MILLISECONDS);
schema.addField(timeFieldSpec);
- extract = new ArrayList<>(SchemaFieldExtractorUtils.extract(schema));
+ extract = new
ArrayList<>(SchemaFieldExtractorUtils.extractSourceFields(schema));
Assert.assertEquals(extract.size(), 2);
Assert.assertTrue(extract.containsAll(Arrays.asList("in", "out")));
}
diff --git a/pinot-spi/src/test/resources/TestRecordReader.java
b/pinot-spi/src/test/resources/TestRecordReader.java
index 8f212a1..a223d58 100644
--- a/pinot-spi/src/test/resources/TestRecordReader.java
+++ b/pinot-spi/src/test/resources/TestRecordReader.java
@@ -3,6 +3,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import java.util.Set;
import javax.annotation.Nullable;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
@@ -21,7 +22,7 @@ public class TestRecordReader implements RecordReader {
Iterator<GenericRow> _iterator;
Schema _schema;
- public void init(File dataFile, Schema schema, @Nullable RecordReaderConfig
recordReaderConfig)
+ public void init(File dataFile, Schema schema, @Nullable RecordReaderConfig
recordReaderConfig, Set<String> fields)
throws IOException {
_schema = schema;
int numRows = 10;
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/PinotAdministrator.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/PinotAdministrator.java
index 298387a..7b52630 100644
---
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/PinotAdministrator.java
+++
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/PinotAdministrator.java
@@ -26,7 +26,6 @@ import org.apache.pinot.tools.admin.command.AddTableCommand;
import org.apache.pinot.tools.admin.command.AddTenantCommand;
import org.apache.pinot.tools.admin.command.AnonymizeDataCommand;
import org.apache.pinot.tools.admin.command.AvroSchemaToPinotSchema;
-import org.apache.pinot.tools.admin.command.BackfillDateTimeColumnCommand;
import org.apache.pinot.tools.admin.command.ChangeNumReplicasCommand;
import org.apache.pinot.tools.admin.command.ChangeTableState;
import org.apache.pinot.tools.admin.command.CreateSegmentCommand;
@@ -119,7 +118,6 @@ public class PinotAdministrator {
@SubCommand(name = "VerifySegmentState", impl =
VerifySegmentState.class),
@SubCommand(name = "ConvertPinotSegment", impl =
PinotSegmentConvertCommand.class),
@SubCommand(name = "MoveReplicaGroup", impl = MoveReplicaGroup.class),
- @SubCommand(name = "BackfillSegmentColumn", impl =
BackfillDateTimeColumnCommand.class),
@SubCommand(name = "VerifyClusterState", impl =
VerifyClusterStateCommand.class),
@SubCommand(name = "RealtimeProvisioningHelper", impl =
RealtimeProvisioningHelperCommand.class),
@SubCommand(name = "MergeSegments", impl = SegmentMergeCommand.class),
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/BackfillDateTimeColumnCommand.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/BackfillDateTimeColumnCommand.java
deleted file mode 100644
index bb0e3dc..0000000
---
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/BackfillDateTimeColumnCommand.java
+++ /dev/null
@@ -1,230 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.tools.admin.command;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.pinot.common.utils.CommonConstants.Segment.SegmentType;
-import org.apache.pinot.core.minion.BackfillDateTimeColumn;
-import org.apache.pinot.spi.data.DateTimeFieldSpec;
-import org.apache.pinot.spi.data.TimeFieldSpec;
-import org.apache.pinot.spi.utils.JsonUtils;
-import org.apache.pinot.spi.utils.builder.TableNameBuilder;
-import org.apache.pinot.tools.Command;
-import org.apache.pinot.tools.backfill.BackfillSegmentUtils;
-import org.kohsuke.args4j.Option;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Class to download a segment, and backfill it with dateTimeFieldSpec
corresponding to the timeFieldSpec
- *
- */
-public class BackfillDateTimeColumnCommand extends AbstractBaseAdminCommand
implements Command {
- private static final String OUTPUT_FOLDER = "output";
- private static final String DOWNLOAD_FOLDER = "download";
- private static final String BACKUP_FOLDER = "backup";
-
- private static final Logger LOGGER =
LoggerFactory.getLogger(BackfillDateTimeColumnCommand.class);
-
- @Option(name = "-controllerHost", required = true, metaVar = "<String>",
usage = "host name for controller.")
- private String _controllerHost;
-
- @Option(name = "-controllerPort", required = true, metaVar = "<int>", usage
= "Port number for controller.")
- private String _controllerPort = DEFAULT_CONTROLLER_PORT;
-
- @Option(name = "-tableName", required = true, metaVar = "<string>", usage =
"Name of the table to backfill")
- private String _tableName;
-
- @Option(name = "-segmentNames", required = false, metaVar = "<string>",
usage = "Comma separated names of the segments to backfill (if not specified,
all segments will be backfilled)")
- private String _segmentNames;
-
- @Option(name = "-segmentType", required = false, metaVar =
"<OFFLINE/REALTIME>", usage = "Type of segments to backfill (if not specified,
all types will be backfilled)")
- private SegmentType _segmentType;
-
- @Option(name = "-srcTimeFieldSpec", required = true, metaVar = "<string>",
usage = "File containing timeFieldSpec as json")
- private String _srcTimeFieldSpec;
-
- @Option(name = "-destDateTimeFieldSpec", required = true, metaVar =
"<string>", usage = "File containing dateTimeFieldSpec as json")
- private String _destDateTimeFieldSpec;
-
- @Option(name = "-backupDir", required = true, metaVar = "<string>", usage =
"Path to backup segments")
- private String _backupDir;
-
- @Option(name = "-help", required = false, help = true, aliases = {"-h",
"--h", "--help"}, usage = "Print this message.")
- private boolean _help = false;
-
- public BackfillDateTimeColumnCommand setControllerHost(String
controllerHost) {
- _controllerHost = controllerHost;
- return this;
- }
-
- public BackfillDateTimeColumnCommand setControllerPort(String
controllerPort) {
- _controllerPort = controllerPort;
- return this;
- }
-
- public BackfillDateTimeColumnCommand setTableName(String tableName) {
- _tableName = tableName;
- return this;
- }
-
- public BackfillDateTimeColumnCommand setSegmentNames(String segmentNames) {
- _segmentNames = segmentNames;
- return this;
- }
-
- public BackfillDateTimeColumnCommand setSegmentType(SegmentType segmentType)
{
- _segmentType = segmentType;
- return this;
- }
-
- public BackfillDateTimeColumnCommand setSrcTimeFieldSpec(String
srcTimeFieldSpec) {
- _srcTimeFieldSpec = srcTimeFieldSpec;
- return this;
- }
-
- public BackfillDateTimeColumnCommand setDestDateTimeFieldSpec(String
destDateTimeFieldSpec) {
- _destDateTimeFieldSpec = destDateTimeFieldSpec;
- return this;
- }
-
- public BackfillDateTimeColumnCommand setBackupDir(String backupDir) {
- _backupDir = backupDir;
- return this;
- }
-
- @Override
- public String toString() {
- return ("BackfillSegmentColumn -controllerHost " + _controllerHost + "
-controllerPort " + _controllerPort
- + " -tableName " + _tableName + " -segmentNames " + _segmentNames + "
-segmentType " + _segmentType
- + " -srcTimeFieldSpec " + _srcTimeFieldSpec + " _destDateTimeFieldSpec
" + _destDateTimeFieldSpec
- + " -backupDir " + _backupDir);
- }
-
- @Override
- public final String getName() {
- return "BackfillSegmentColumn";
- }
-
- @Override
- public String description() {
- return "Backfill a column in segments of a pinot table, with a millis
value corresponding to the time column";
- }
-
- @Override
- public boolean getHelp() {
- return _help;
- }
-
- @Override
- public boolean execute()
- throws Exception {
- LOGGER.info("Executing command: {}", toString());
-
- if (_controllerHost == null || _controllerPort == null) {
- throw new RuntimeException("Must specify controller host and port.");
- }
-
- if (_backupDir == null) {
- throw new RuntimeException("Must specify path to backup segments");
- }
-
- if (_srcTimeFieldSpec == null || _destDateTimeFieldSpec == null) {
- throw new RuntimeException("Must specify srcTimeFieldSpec and
destTimeFieldSpec.");
- }
- TimeFieldSpec timeFieldSpec = JsonUtils.fileToObject(new
File(_srcTimeFieldSpec), TimeFieldSpec.class);
- DateTimeFieldSpec dateTimeFieldSpec =
- JsonUtils.fileToObject(new File(_destDateTimeFieldSpec),
DateTimeFieldSpec.class);
-
- if (_tableName == null) {
- throw new RuntimeException("Must specify tableName.");
- }
-
- BackfillSegmentUtils backfillSegmentUtils = new
BackfillSegmentUtils(_controllerHost, _controllerPort);
-
- List<String> segmentNames = new ArrayList<>();
- List<String> allSegmentNames =
backfillSegmentUtils.getAllSegments(_tableName, _segmentType);
- if (_segmentNames == null) {
- segmentNames = allSegmentNames;
- } else {
- for (String segmentName : _segmentNames.split(",")) {
- if (allSegmentNames.contains(segmentName)) {
- segmentNames.add(segmentName);
- } else {
- throw new RuntimeException("Segment with name " + segmentName + "
does not exist.");
- }
- }
- }
-
- File backupDir = new File(_backupDir, BACKUP_FOLDER);
- File tableBackupDir = new File(backupDir, _tableName);
- File downloadDir = new File(TMP_DIR, DOWNLOAD_FOLDER);
- LOGGER.info("Backup dir {}", tableBackupDir);
- LOGGER.info("DownloadDir {}", downloadDir);
-
- for (String segmentName : segmentNames) {
-
- LOGGER.info("\n\nSegment {}", segmentName);
-
- // download segment
- File downloadSegmentDir = new File(downloadDir, segmentName);
- LOGGER.info("Downloading segment {} to {}", segmentName,
downloadDir.getAbsolutePath());
- boolean downloadStatus =
- backfillSegmentUtils.downloadSegment(_tableName, segmentName,
downloadSegmentDir, tableBackupDir);
- LOGGER.info("Download status for segment {} is {}", segmentName,
downloadStatus);
- if (!downloadStatus) {
- LOGGER.error("Failed to download segment {}. Skipping it.",
segmentName);
- continue;
- }
-
- // create new segment
- String rawTableName = TableNameBuilder.extractRawTableName(_tableName);
- File segmentDir = new File(downloadSegmentDir, segmentName);
- File outputDir = new File(downloadSegmentDir, OUTPUT_FOLDER);
- BackfillDateTimeColumn backfillDateTimeColumn =
- new BackfillDateTimeColumn(rawTableName, segmentDir, outputDir,
timeFieldSpec, dateTimeFieldSpec);
- boolean backfillStatus = backfillDateTimeColumn.backfill();
- LOGGER
- .info("Backfill status for segment {} in {} to {} is {}",
segmentName, segmentDir, outputDir, backfillStatus);
-
- // upload segment
- LOGGER.info("Uploading segment {} to host: {} port: {}", segmentName,
_controllerHost, _controllerPort);
- backfillSegmentUtils.uploadSegment(_tableName, segmentName, new
File(outputDir, segmentName), outputDir);
- }
-
- // verify that all segments exist
- List<String> missingSegments = new ArrayList<>();
- allSegmentNames = backfillSegmentUtils.getAllSegments(_tableName,
_segmentType);
- for (String segmentName : segmentNames) {
- if (!allSegmentNames.contains(segmentName)) {
- missingSegments.add(segmentName);
- }
- }
- if (missingSegments.size() != 0) {
- LOGGER.error("Failed to backfill and upload segments {}",
missingSegments);
- return false;
- }
-
- LOGGER.info("Original segment backup is at {}",
tableBackupDir.getAbsolutePath());
- return true;
- }
-}
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/CreateSegmentCommand.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/CreateSegmentCommand.java
index 8424c82..680b808 100644
---
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/CreateSegmentCommand.java
+++
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/CreateSegmentCommand.java
@@ -40,6 +40,7 @@ import org.apache.pinot.spi.data.readers.RecordReader;
import org.apache.pinot.spi.filesystem.PinotFS;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.SchemaFieldExtractorUtils;
import org.apache.pinot.tools.Command;
import org.kohsuke.args4j.Option;
import org.slf4j.Logger;
@@ -364,7 +365,7 @@ public class CreateSegmentCommand extends
AbstractBaseAdminCommand implements Co
if (_readerConfigFile != null) {
readerConfig = JsonUtils.fileToObject(new
File(_readerConfigFile), CSVRecordReaderConfig.class);
}
- csvRecordReader.init(localFile, schema, readerConfig);
+ csvRecordReader.init(localFile, schema, readerConfig,
SchemaFieldExtractorUtils.extractSourceFields(schema));
driver.init(config, csvRecordReader);
break;
default:
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java
index af80892..f78ed77 100644
---
a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java
+++
b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/MeetupRsvpStream.java
@@ -36,6 +36,7 @@ import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.stream.StreamDataProducer;
import org.apache.pinot.spi.stream.StreamDataProvider;
import org.apache.pinot.spi.stream.StreamMessageDecoder;
+import org.apache.pinot.spi.utils.SchemaFieldExtractorUtils;
import org.apache.pinot.tools.utils.KafkaStarterUtils;
import org.glassfish.tyrus.client.ClientManager;
@@ -67,7 +68,7 @@ public class MeetupRsvpStream {
try {
final ClientEndpointConfig cec =
ClientEndpointConfig.Builder.create().build();
final StreamMessageDecoder decoder =
PluginManager.get().createInstance(KafkaStarterUtils.KAFKA_JSON_MESSAGE_DECODER_CLASS_NAME);
- decoder.init(null, schema, null);
+ decoder.init(null, schema, null,
SchemaFieldExtractorUtils.extractSourceFields(schema));
client = ClientManager.createClient();
client.connectToServer(new Endpoint() {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]