This is an automated email from the ASF dual-hosted git repository.
apucher pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 08d26d6 [TE] datasource - correction for maxtime offset in pinot
datasource (#3598)
08d26d6 is described below
commit 08d26d68b7abe4ab9e9866b39af91a907489fcd6
Author: Alexander Pucher <[email protected]>
AuthorDate: Mon Dec 10 14:35:14 2018 -0800
[TE] datasource - correction for maxtime offset in pinot datasource (#3598)
This PR fixes the time offset as computed by the pinot data source max time
fetcher. Previously, this method computed offsets from epoch without
considering the time zone of the underlying data set. Additionally, the PR adds
a sanity cutoff for the max time within 1 day in the future from the current
timestamp. As a consequence this PR also fixes retrieval of dimension filter
keys and values on data sets with a time granularity > 1 week.
---
.../com/linkedin/thirdeye/api/TimeGranularity.java | 25 ++++++++++++++++++++++
.../pinot/PinotDataSourceDimensionFilters.java | 2 +-
.../datasource/pinot/PinotDataSourceMaxTime.java | 25 +++++++++++-----------
3 files changed, 38 insertions(+), 14 deletions(-)
diff --git
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/api/TimeGranularity.java
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/api/TimeGranularity.java
index a5ffaf3..bcf6127 100644
---
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/api/TimeGranularity.java
+++
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/api/TimeGranularity.java
@@ -19,6 +19,8 @@ package com.linkedin.thirdeye.api;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
import org.joda.time.DurationFieldType;
import org.joda.time.Period;
import org.joda.time.PeriodType;
@@ -77,6 +79,29 @@ public class TimeGranularity {
}
/**
+ * Returns the equivalent milliseconds of the specified number of this time
granularity,
+ * given a time zone.
+ *
+ * @param number the specified number of this time granularity.
+ * @param timeZone the time zone to base the timestamp off of
+ * @return the timestamp in millis
+ */
+ public long toMillis(long number, DateTimeZone timeZone) {
+ if (number > Integer.MAX_VALUE) {
+ switch (this.getUnit()) {
+ case MILLISECONDS:
+ return number;
+ case SECONDS:
+ return number * 1000;
+ default:
+ throw new IllegalArgumentException("epoch offset too large");
+ }
+ }
+
+ return new DateTime(0, timeZone).plus(this.toPeriod((int)
number)).getMillis();
+ }
+
+ /**
* Returns an equivalent Period object of this time granularity.
*
* @return an equivalent Period object of this time granularity.
diff --git
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datasource/pinot/PinotDataSourceDimensionFilters.java
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datasource/pinot/PinotDataSourceDimensionFilters.java
index 28a683e..a5f7b25 100644
---
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datasource/pinot/PinotDataSourceDimensionFilters.java
+++
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datasource/pinot/PinotDataSourceDimensionFilters.java
@@ -71,7 +71,7 @@ public class PinotDataSourceDimensionFilters {
// left blank
}
- DateTime endDateTime = new DateTime(maxTime);
+ DateTime endDateTime = new DateTime(maxTime).plusMillis(1);
DateTime startDateTime = endDateTime.minusDays(7);
Map<String, List<String>> filters = null;
diff --git
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datasource/pinot/PinotDataSourceMaxTime.java
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datasource/pinot/PinotDataSourceMaxTime.java
index b4a035e..4f728a4 100644
---
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datasource/pinot/PinotDataSourceMaxTime.java
+++
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datasource/pinot/PinotDataSourceMaxTime.java
@@ -26,8 +26,10 @@ import com.linkedin.thirdeye.util.ThirdEyeUtils;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
import org.joda.time.Period;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
@@ -41,7 +43,7 @@ public class PinotDataSourceMaxTime {
private static final Logger LOGGER =
LoggerFactory.getLogger(PinotDataSourceMaxTime.class);
private static final DAORegistry DAO_REGISTRY = DAORegistry.getInstance();
- private final static String COLLECTION_MAX_TIME_QUERY_TEMPLATE = "SELECT
max(%s) FROM %s WHERE %s >= %s";
+ private final static String COLLECTION_MAX_TIME_QUERY_TEMPLATE = "SELECT
max(%s) FROM %s WHERE %s";
private final Map<String, Long> collectionToPrevMaxDataTimeMap = new
ConcurrentHashMap<String, Long>();
private final PinotThirdEyeDataSource pinotThirdEyeDataSource;
@@ -63,9 +65,11 @@ public class PinotDataSourceMaxTime {
// By default, query only offline, unless dataset has been marked as
realtime
String tableName = ThirdEyeUtils.computeTableName(dataset);
TimeSpec timeSpec =
ThirdEyeUtils.getTimestampTimeSpecFromDatasetConfig(datasetConfig);
- long prevMaxDataTime = getPrevMaxDataTime(dataset);
- String maxTimePql = String.format(COLLECTION_MAX_TIME_QUERY_TEMPLATE,
timeSpec.getColumnName(), tableName,
- timeSpec.getColumnName(), prevMaxDataTime);
+
+ long cutoffTime = System.currentTimeMillis() + TimeUnit.DAYS.toMillis(1);
+ String timeClause = PqlUtils.getBetweenClause(new DateTime(0,
DateTimeZone.UTC), new DateTime(cutoffTime, DateTimeZone.UTC), timeSpec,
dataset);
+
+ String maxTimePql = String.format(COLLECTION_MAX_TIME_QUERY_TEMPLATE,
timeSpec.getColumnName(), tableName, timeClause);
PinotQuery maxTimePinotQuery = new PinotQuery(maxTimePql, tableName);
ThirdEyeResultSetGroup resultSetGroup;
@@ -83,15 +87,17 @@ public class PinotDataSourceMaxTime {
LOGGER.error("Failed to get latest max time for dataset {} with PQL:
{}", tableName, maxTimePinotQuery.getPql());
this.collectionToPrevMaxDataTimeMap.remove(dataset);
} else {
+ DateTimeZone timeZone = Utils.getDataTimeZone(dataset);
+
long endTime = new
Double(resultSetGroup.get(0).getDouble(0)).longValue();
this.collectionToPrevMaxDataTimeMap.put(dataset, endTime);
// endTime + 1 to make sure we cover the time range of that time value.
String timeFormat = timeSpec.getFormat();
if (StringUtils.isBlank(timeFormat) ||
TimeSpec.SINCE_EPOCH_FORMAT.equals(timeFormat)) {
- maxTime = timeSpec.getDataGranularity().toMillis(endTime + 1) - 1;
+ maxTime = timeSpec.getDataGranularity().toMillis(endTime + 1,
timeZone) - 1;
} else {
DateTimeFormatter inputDataDateTimeFormatter =
-
DateTimeFormat.forPattern(timeFormat).withZone(Utils.getDataTimeZone(dataset));
+ DateTimeFormat.forPattern(timeFormat).withZone(timeZone);
DateTime endDateTime = DateTime.parse(String.valueOf(endTime),
inputDataDateTimeFormatter);
Period oneBucket = datasetConfig.bucketTimeGranularity().toPeriod();
maxTime = endDateTime.plus(oneBucket).getMillis() - 1;
@@ -106,11 +112,4 @@ public class PinotDataSourceMaxTime {
}
return maxTime;
}
-
- private long getPrevMaxDataTime(String collection) {
- if (this.collectionToPrevMaxDataTimeMap.containsKey(collection)) {
- return collectionToPrevMaxDataTimeMap.get(collection);
- }
- return 0;
- }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]