This is an automated email from the ASF dual-hosted git repository.
nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 430d48e The last of the getTimeFieldSpec calls (#5378)
430d48e is described below
commit 430d48e3fb97ad66905b9fad117fd7de3ad2952e
Author: Neha Pawar <[email protected]>
AuthorDate: Fri May 15 15:53:02 2020 -0700
The last of the getTimeFieldSpec calls (#5378)
Removing the last of the getTimeFieldSpec calls.
1. SegmentGeneratorConfig now uses the new getSpecForTimeColumn() call,
which returns a DateTimeFieldSpec. Time column is expected in table config if
it is to be considered when creating segment. (this also sets the stage for
allowing a DateTimeFieldSpec to be a primary time column)
2. Removing TimeFieldSpec special handling from
RealtimeSegmentConverter.getUpdatedSchema. There is no need to remove the
incoming time spec. Neither the recordReader nor the dataSource require the
updated schema. Plus, the record transformer is a pass through.
3. Removing special casing for TimeFieldSpec in
Schema.isBackwardCompatible() method. The for loop for all specs includes time
---
.../generator/SegmentGeneratorConfig.java | 50 ++++++----------------
.../minion/rollup/MergeRollupSegmentConverter.java | 1 -
.../converter/RealtimeSegmentConverter.java | 17 +-------
.../generator/SegmentGeneratorConfigTest.java | 16 ++-----
.../index/loader/SegmentPreProcessorTest.java | 2 +-
.../converter/RealtimeSegmentConverterTest.java | 17 --------
.../java/org/apache/pinot/spi/data/Schema.java | 13 +++---
.../org/apache/pinot/tools/HybridQuickstart.java | 14 +++---
.../pinot/tools/streams/AirlineDataStream.java | 12 +++---
9 files changed, 43 insertions(+), 99 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java
b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java
index 7126ca4..59531fe 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java
@@ -31,6 +31,7 @@ import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import org.apache.commons.lang.StringUtils;
import org.apache.pinot.core.io.compression.ChunkCompressorFactory;
import org.apache.pinot.core.segment.name.FixedSegmentNameGenerator;
@@ -42,6 +43,8 @@ import org.apache.pinot.spi.config.table.IndexingConfig;
import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
import org.apache.pinot.spi.config.table.StarTreeIndexConfig;
import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.DateTimeFieldSpec;
+import org.apache.pinot.spi.data.DateTimeFormatSpec;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.FieldSpec.FieldType;
import org.apache.pinot.spi.data.Schema;
@@ -118,10 +121,6 @@ public class SegmentGeneratorConfig {
setSchema(schema);
// NOTE: SegmentGeneratorConfig#setSchema doesn't set the time column
anymore. timeColumnName is expected to be read from table config.
- // If time column name is not set in table config, read time from schema.
- // WARN: Once we move to DateTimeFieldSpec - table config has to be
provided with valid time - if time needs to be set.
- // We cannot deduce whether 1) one of the provided DateTimes should be
used as time column 2) if yes, which one
- // Even if only 1 DateTime exists, we cannot determine whether it should
be primary time column (there could be no time column for table (REFRESH), but
still multiple DateTimeFieldSpec)
String timeColumnName = null;
if (tableConfig.getValidationConfig() != null) {
timeColumnName = tableConfig.getValidationConfig().getTimeColumnName();
@@ -182,42 +181,21 @@ public class SegmentGeneratorConfig {
}
/**
- * Set time column details using the given time column. If not found, use
schema
+ * Set time column details using the given time column
*/
- public void setTime(String timeColumnName, Schema schema) {
+ public void setTime(@Nullable String timeColumnName, Schema schema) {
if (timeColumnName != null) {
- FieldSpec fieldSpec = schema.getFieldSpecFor(timeColumnName);
- if (fieldSpec != null) {
- setTime(fieldSpec);
- return;
+ DateTimeFieldSpec dateTimeFieldSpec =
schema.getSpecForTimeColumn(timeColumnName);
+ if (dateTimeFieldSpec != null) {
+ setTimeColumnName(dateTimeFieldSpec.getName());
+ DateTimeFormatSpec formatSpec = new
DateTimeFormatSpec(dateTimeFieldSpec.getFormat());
+ if
(formatSpec.getTimeFormat().equals(DateTimeFieldSpec.TimeFormat.EPOCH)) {
+ setSegmentTimeUnit(formatSpec.getColumnUnit());
+ } else {
+ setSimpleDateFormat(formatSpec.getSDFPattern());
+ }
}
}
- setTime(schema.getTimeFieldSpec());
- }
-
- /**
- * Set time column details using the given field spec
- */
- private void setTime(FieldSpec timeSpec) {
- if (timeSpec == null) {
- return;
- }
- TimeFieldSpec timeFieldSpec = (TimeFieldSpec) timeSpec;
- setTimeColumnName(timeFieldSpec.getName());
-
- TimeGranularitySpec timeGranularitySpec =
timeFieldSpec.getOutgoingGranularitySpec();
-
- String timeFormat = timeGranularitySpec.getTimeFormat();
- if (timeFormat.equals(TimeGranularitySpec.TimeFormat.EPOCH.toString())) {
- // Time format: 'EPOCH'
- setSegmentTimeUnit(timeGranularitySpec.getTimeType());
- } else {
- // Time format: 'SIMPLE_DATE_FORMAT:<pattern>'
-
Preconditions.checkArgument(timeFormat.startsWith(TimeGranularitySpec.TimeFormat.SIMPLE_DATE_FORMAT.toString()),
- "Invalid time format: %s, must be one of '%s' or '%s:<pattern>'",
timeFormat,
- TimeGranularitySpec.TimeFormat.EPOCH,
TimeGranularitySpec.TimeFormat.SIMPLE_DATE_FORMAT);
- setSimpleDateFormat(timeFormat.substring(timeFormat.indexOf(':') + 1));
- }
}
/**
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/minion/rollup/MergeRollupSegmentConverter.java
b/pinot-core/src/main/java/org/apache/pinot/core/minion/rollup/MergeRollupSegmentConverter.java
index 3b7d089..9e79070 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/minion/rollup/MergeRollupSegmentConverter.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/minion/rollup/MergeRollupSegmentConverter.java
@@ -112,7 +112,6 @@ public class MergeRollupSegmentConverter {
for (DateTimeFieldSpec dateTimeFieldSpec : schema.getDateTimeFieldSpecs())
{
groupByColumns.add(dateTimeFieldSpec.getName());
}
- // TODO: once time column starts showing up as dateTimeFieldSpec
(https://github.com/apache/incubator-pinot/issues/2756) below lines becomes
redundant
String timeColumnName =
_tableConfig.getValidationConfig().getTimeColumnName();
if (timeColumnName != null && !groupByColumns.contains(timeColumnName)) {
groupByColumns.add(timeColumnName);
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/RealtimeSegmentConverter.java
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/RealtimeSegmentConverter.java
index 48c63a4..f8e5098 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/RealtimeSegmentConverter.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/realtime/converter/RealtimeSegmentConverter.java
@@ -150,26 +150,13 @@ public class RealtimeSegmentConverter {
}
/**
- * Returns a new schema based on the original one. The new schema removes
columns as needed (for ex, virtual cols)
- * and adds the new timespec to the schema.
+ * Returns a new schema containing only physical columns
*/
@VisibleForTesting
public Schema getUpdatedSchema(Schema original) {
Schema newSchema = new Schema();
- TimeFieldSpec tfs = original.getTimeFieldSpec();
- if (tfs != null) {
- // Use outgoing granularity for creating segment
- TimeGranularitySpec outgoing = tfs.getOutgoingGranularitySpec();
- if (outgoing != null) {
- TimeFieldSpec newTimeSpec = new TimeFieldSpec(outgoing);
- newSchema.addField(newTimeSpec);
- }
- }
-
for (String col : original.getPhysicalColumnNames()) {
- if ((tfs == null) || (!col.equals(tfs.getName()))) {
- newSchema.addField(original.getFieldSpecFor(col));
- }
+ newSchema.addField(original.getFieldSpecFor(col));
}
return newSchema;
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfigTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfigTest.java
index 6c19432..05e8a7d 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfigTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfigTest.java
@@ -40,22 +40,18 @@ public class SegmentGeneratorConfigTest {
.addTime(new TimeGranularitySpec(FieldSpec.DataType.INT,
TimeUnit.DAYS, "daysSinceEpoch"), null).build();
TableConfig tableConfig =
new
TableConfigBuilder(TableType.OFFLINE).setTableName("test").setTimeColumnName("daysSinceEpoch").build();
- // table config provided
SegmentGeneratorConfig segmentGeneratorConfig = new
SegmentGeneratorConfig(tableConfig, schema);
assertEquals(segmentGeneratorConfig.getTimeColumnName(), "daysSinceEpoch");
assertEquals(segmentGeneratorConfig.getTimeColumnType(),
SegmentGeneratorConfig.TimeColumnType.EPOCH);
assertEquals(segmentGeneratorConfig.getSegmentTimeUnit(), TimeUnit.DAYS);
assertNull(segmentGeneratorConfig.getSimpleDateFormat());
- // table config not provided
- // NOTE: this behavior will not hold true when we move to
dateTimeFieldSpec.
// MUST provide valid tableConfig with time column if time details are
wanted
tableConfig =
new TableConfigBuilder(TableType.OFFLINE).setTableName("test").build();
segmentGeneratorConfig = new SegmentGeneratorConfig(tableConfig, schema);
- assertEquals(segmentGeneratorConfig.getTimeColumnName(), "daysSinceEpoch");
- assertEquals(segmentGeneratorConfig.getTimeColumnType(),
SegmentGeneratorConfig.TimeColumnType.EPOCH);
- assertEquals(segmentGeneratorConfig.getSegmentTimeUnit(), TimeUnit.DAYS);
+ assertNull(segmentGeneratorConfig.getTimeColumnName());
+ assertNull(segmentGeneratorConfig.getSegmentTimeUnit());
assertNull(segmentGeneratorConfig.getSimpleDateFormat());
}
@@ -66,22 +62,18 @@ public class SegmentGeneratorConfigTest {
TableConfig tableConfig =
new
TableConfigBuilder(TableType.OFFLINE).setTableName("test").setTimeColumnName("Date").build();
- // Table config provided
SegmentGeneratorConfig segmentGeneratorConfig = new
SegmentGeneratorConfig(tableConfig, schema);
assertEquals(segmentGeneratorConfig.getTimeColumnName(), "Date");
assertEquals(segmentGeneratorConfig.getTimeColumnType(),
SegmentGeneratorConfig.TimeColumnType.SIMPLE_DATE);
assertNull(segmentGeneratorConfig.getSegmentTimeUnit());
assertEquals(segmentGeneratorConfig.getSimpleDateFormat(), "yyyyMMdd");
- // Table config not provided
- // NOTE: this behavior will not hold true when we move to
dateTimeFieldSpec.
// MUST provide valid tableConfig with time column if time details are
wanted
tableConfig =
new TableConfigBuilder(TableType.OFFLINE).setTableName("test").build();
segmentGeneratorConfig = new SegmentGeneratorConfig(tableConfig, schema);
- assertEquals(segmentGeneratorConfig.getTimeColumnName(), "Date");
- assertEquals(segmentGeneratorConfig.getTimeColumnType(),
SegmentGeneratorConfig.TimeColumnType.SIMPLE_DATE);
+ assertNull(segmentGeneratorConfig.getTimeColumnName());
assertNull(segmentGeneratorConfig.getSegmentTimeUnit());
- assertEquals(segmentGeneratorConfig.getSimpleDateFormat(), "yyyyMMdd");
+ assertNull(segmentGeneratorConfig.getSimpleDateFormat());
}
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/segment/index/loader/SegmentPreProcessorTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/segment/index/loader/SegmentPreProcessorTest.java
index edc9fbe..3da79bb 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/segment/index/loader/SegmentPreProcessorTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/segment/index/loader/SegmentPreProcessorTest.java
@@ -118,7 +118,7 @@ public class SegmentPreProcessorTest {
Assert.assertNotNull(resourceUrl);
_schema = Schema.fromFile(new File(resourceUrl.getFile()));
_tableConfig =
- new
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setTimeColumnName("daySinceEpoch").build();
+ new
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setTimeColumnName("daysSinceEpoch").build();
resourceUrl = classLoader.getResource(NEW_COLUMNS_SCHEMA1);
Assert.assertNotNull(resourceUrl);
_newColumnsSchema1 = Schema.fromFile(new File(resourceUrl.getFile()));
diff --git
a/pinot-core/src/test/java/org/apache/pinot/realtime/converter/RealtimeSegmentConverterTest.java
b/pinot-core/src/test/java/org/apache/pinot/realtime/converter/RealtimeSegmentConverterTest.java
index c09a3d6..d37f32c 100644
---
a/pinot-core/src/test/java/org/apache/pinot/realtime/converter/RealtimeSegmentConverterTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/realtime/converter/RealtimeSegmentConverterTest.java
@@ -46,28 +46,11 @@ public class RealtimeSegmentConverterTest {
String segmentName = "segment1";
VirtualColumnProviderFactory.addBuiltInVirtualColumnsToSegmentSchema(schema,
segmentName);
Assert.assertEquals(schema.getColumnNames().size(), 5);
-
Assert.assertEquals(schema.getTimeFieldSpec().getIncomingGranularitySpec().getTimeType(),
TimeUnit.MILLISECONDS);
RealtimeSegmentConverter converter =
new RealtimeSegmentConverter(null, "", schema, "testTable",
tableConfig, segmentName, "col1");
Schema newSchema = converter.getUpdatedSchema(schema);
Assert.assertEquals(newSchema.getColumnNames().size(), 2);
-
Assert.assertEquals(newSchema.getTimeFieldSpec().getIncomingGranularitySpec().getTimeType(),
TimeUnit.DAYS);
- }
-
- @Test
- public void testNoTimeColumnsInSchema() {
- Schema schema = new Schema();
- schema.addField(new DimensionFieldSpec("col1", FieldSpec.DataType.STRING,
true));
- schema.addField(new DimensionFieldSpec("col2", FieldSpec.DataType.STRING,
true));
- schema.addField(new DimensionFieldSpec("col3", FieldSpec.DataType.STRING,
true));
- schema.addField(new MetricFieldSpec("met1", FieldSpec.DataType.DOUBLE, 0));
- schema.addField(new MetricFieldSpec("met2", FieldSpec.DataType.LONG, 0));
- Assert.assertEquals(schema.getColumnNames().size(), 5);
- RealtimeSegmentConverter converter =
- new RealtimeSegmentConverter(null, "", schema, "testTable", null,
"segment1", "col1");
- Schema newSchema = converter.getUpdatedSchema(schema);
- Assert.assertEquals(newSchema.getColumnNames().size(), 5);
}
}
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java
index a3a2b64..0659839 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java
@@ -578,19 +578,18 @@ public final class Schema {
* Backward compatibility requires all columns and fieldSpec in oldSchema
should be retained.
*
* @param oldSchema old schema
- * @return
*/
public boolean isBackwardCompatibleWith(Schema oldSchema) {
- if (!EqualityUtils.isEqual(_timeFieldSpec, oldSchema.getTimeFieldSpec())
|| !EqualityUtils
- .isEqual(_dateTimeFieldSpecs, oldSchema.getDateTimeFieldSpecs())) {
- return false;
- }
+ Set<String> columnNames = getColumnNames();
for (Map.Entry<String, FieldSpec> entry :
oldSchema.getFieldSpecMap().entrySet()) {
- if (!getColumnNames().contains(entry.getKey())) {
+ String oldSchemaColumnName = entry.getKey();
+ if (!columnNames.contains(oldSchemaColumnName)) {
return false;
}
- if (!getFieldSpecFor(entry.getKey()).equals(entry.getValue())) {
+ FieldSpec oldSchemaFieldSpec = entry.getValue();
+ FieldSpec fieldSpec = getFieldSpecFor(oldSchemaColumnName);
+ if (!fieldSpec.equals(oldSchemaFieldSpec)) {
return false;
}
}
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java
index 1b0adcc..ac87795 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java
@@ -25,11 +25,13 @@ import java.io.IOException;
import java.net.URL;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.utils.ZkStarter;
+import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.readers.FileFormat;
import org.apache.pinot.spi.plugin.PluginManager;
import org.apache.pinot.spi.stream.StreamDataProvider;
import org.apache.pinot.spi.stream.StreamDataServerStartable;
+import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.tools.Quickstart.Color;
import org.apache.pinot.tools.admin.command.QuickstartRunner;
import org.apache.pinot.tools.streams.AirlineDataStream;
@@ -45,6 +47,7 @@ public class HybridQuickstart {
private StreamDataServerStartable _kafkaStarter;
private ZkStarter.ZookeeperInstance _zookeeperInstance;
private File _schemaFile;
+ private File _realtimeTableConfigFile;
private File _dataFile;
private File _ingestionJobSpecFile;
@@ -92,18 +95,18 @@ public class HybridQuickstart {
}
_dataFile = new File(_realtimeQuickStartDataDir, "airlineStats_data.avro");
- File tableConfigFile = new File(_realtimeQuickStartDataDir,
"airlineStats_realtime_table_config.json");
+ _realtimeTableConfigFile = new File(_realtimeQuickStartDataDir,
"airlineStats_realtime_table_config.json");
URL resource = Quickstart.class.getClassLoader().getResource(
"examples/stream/airlineStats/airlineStats_realtime_table_config.json");
Preconditions.checkNotNull(resource);
- FileUtils.copyURLToFile(resource, tableConfigFile);
+ FileUtils.copyURLToFile(resource, _realtimeTableConfigFile);
resource = Quickstart.class.getClassLoader().getResource(
"examples/stream/airlineStats/sample_data/airlineStats_data.avro");
Preconditions.checkNotNull(resource);
FileUtils.copyURLToFile(resource, _dataFile);
- return new QuickstartTableRequest("airlineStats", _schemaFile,
tableConfigFile);
+ return new QuickstartTableRequest("airlineStats", _schemaFile,
_realtimeTableConfigFile);
}
private void startKafka() {
@@ -136,8 +139,9 @@ public class HybridQuickstart {
runner.launchDataIngestionJob();
printStatus(Color.YELLOW, "***** Starting airline data stream and
publishing to Kafka *****");
-
- final AirlineDataStream stream = new
AirlineDataStream(Schema.fromFile(_schemaFile), _dataFile);
+ Schema schema = Schema.fromFile(_schemaFile);
+ TableConfig tableConfig = JsonUtils.fileToObject(_realtimeTableConfigFile,
TableConfig.class);
+ final AirlineDataStream stream = new AirlineDataStream(schema,
tableConfig, _dataFile);
stream.run();
printStatus(Color.YELLOW, "***** Pinot Hybrid with hybrid table setup is
complete *****");
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/AirlineDataStream.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/AirlineDataStream.java
index 897ca00..97b670b 100644
---
a/pinot-tools/src/main/java/org/apache/pinot/tools/streams/AirlineDataStream.java
+++
b/pinot-tools/src/main/java/org/apache/pinot/tools/streams/AirlineDataStream.java
@@ -30,6 +30,8 @@ import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.pinot.plugin.inputformat.avro.AvroUtils;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.DateTimeFieldSpec;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.data.TimeFieldSpec;
@@ -48,6 +50,7 @@ public class AirlineDataStream {
private static final Logger logger =
LoggerFactory.getLogger(AirlineDataStream.class);
Schema pinotSchema;
+ String timeColumnName;
File avroFile;
DataFileStream<GenericRecord> avroDataStream;
Integer currentTimeValue = 16102;
@@ -56,9 +59,10 @@ public class AirlineDataStream {
int counter = 0;
private StreamDataProducer producer;
- public AirlineDataStream(Schema pinotSchema, File avroFile)
+ public AirlineDataStream(Schema pinotSchema, TableConfig tableConfig, File
avroFile)
throws Exception {
this.pinotSchema = pinotSchema;
+ this.timeColumnName =
tableConfig.getValidationConfig().getTimeColumnName();
this.avroFile = avroFile;
createStream();
Properties properties = new Properties();
@@ -120,13 +124,11 @@ public class AirlineDataStream {
message.put(spec.getName(), record.get(spec.getName()));
}
- for (FieldSpec spec : pinotSchema.getDimensionFieldSpecs()) {
+ for (FieldSpec spec : pinotSchema.getMetricFieldSpecs()) {
message.put(spec.getName(), record.get(spec.getName()));
}
- TimeFieldSpec spec = pinotSchema.getTimeFieldSpec();
- String timeColumn = spec.getIncomingGranularitySpec().getName();
- message.put(timeColumn, currentTimeValue);
+ message.put(timeColumnName, currentTimeValue);
try {
publish(message);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]