apucher closed pull request #3598: [TE] datasource - correction for maxtime 
offset in pinot datasource
URL: https://github.com/apache/incubator-pinot/pull/3598
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/api/TimeGranularity.java
 
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/api/TimeGranularity.java
index a5ffaf36ea..bcf61276cd 100644
--- 
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/api/TimeGranularity.java
+++ 
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/api/TimeGranularity.java
@@ -19,6 +19,8 @@
 import com.fasterxml.jackson.annotation.JsonProperty;
 import java.util.Objects;
 import java.util.concurrent.TimeUnit;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
 import org.joda.time.DurationFieldType;
 import org.joda.time.Period;
 import org.joda.time.PeriodType;
@@ -76,6 +78,29 @@ public long toMillis(long number) {
     return unit.toMillis(number * size);
   }
 
+  /**
+   * Returns the equivalent milliseconds of the specified number of this time 
granularity,
+   * given a time zone.
+   *
+   * @param number the specified number of this time granularity.
+   * @param timeZone the time zone to base the timestamp off of
+   * @return the timestamp in millis
+   */
+  public long toMillis(long number, DateTimeZone timeZone) {
+    if (number > Integer.MAX_VALUE) {
+      switch (this.getUnit()) {
+        case MILLISECONDS:
+          return number;
+        case SECONDS:
+          return number * 1000;
+        default:
+          throw new IllegalArgumentException("epoch offset too large");
+      }
+    }
+
+    return new DateTime(0, timeZone).plus(this.toPeriod((int) 
number)).getMillis();
+  }
+
   /**
    * Returns an equivalent Period object of this time granularity.
    *
diff --git 
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datasource/pinot/PinotDataSourceDimensionFilters.java
 
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datasource/pinot/PinotDataSourceDimensionFilters.java
index 28a683e568..a5f7b25800 100644
--- 
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datasource/pinot/PinotDataSourceDimensionFilters.java
+++ 
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datasource/pinot/PinotDataSourceDimensionFilters.java
@@ -71,7 +71,7 @@ public 
PinotDataSourceDimensionFilters(PinotThirdEyeDataSource pinotThirdEyeData
       // left blank
     }
 
-    DateTime endDateTime = new DateTime(maxTime);
+    DateTime endDateTime = new DateTime(maxTime).plusMillis(1);
     DateTime startDateTime = endDateTime.minusDays(7);
 
     Map<String, List<String>> filters = null;
diff --git 
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datasource/pinot/PinotDataSourceMaxTime.java
 
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datasource/pinot/PinotDataSourceMaxTime.java
index b4a035ec86..4f728a42a0 100644
--- 
a/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datasource/pinot/PinotDataSourceMaxTime.java
+++ 
b/thirdeye/thirdeye-pinot/src/main/java/com/linkedin/thirdeye/datasource/pinot/PinotDataSourceMaxTime.java
@@ -26,8 +26,10 @@
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang3.StringUtils;
 import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
 import org.joda.time.Period;
 import org.joda.time.format.DateTimeFormat;
 import org.joda.time.format.DateTimeFormatter;
@@ -41,7 +43,7 @@
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PinotDataSourceMaxTime.class);
   private static final DAORegistry DAO_REGISTRY = DAORegistry.getInstance();
 
-  private final static String COLLECTION_MAX_TIME_QUERY_TEMPLATE = "SELECT 
max(%s) FROM %s WHERE %s >= %s";
+  private final static String COLLECTION_MAX_TIME_QUERY_TEMPLATE = "SELECT 
max(%s) FROM %s WHERE %s";
 
   private final Map<String, Long> collectionToPrevMaxDataTimeMap = new 
ConcurrentHashMap<String, Long>();
   private final PinotThirdEyeDataSource pinotThirdEyeDataSource;
@@ -63,9 +65,11 @@ public long getMaxDateTime(String dataset) {
       // By default, query only offline, unless dataset has been marked as 
realtime
       String tableName = ThirdEyeUtils.computeTableName(dataset);
       TimeSpec timeSpec = 
ThirdEyeUtils.getTimestampTimeSpecFromDatasetConfig(datasetConfig);
-      long prevMaxDataTime = getPrevMaxDataTime(dataset);
-      String maxTimePql = String.format(COLLECTION_MAX_TIME_QUERY_TEMPLATE, 
timeSpec.getColumnName(), tableName,
-          timeSpec.getColumnName(), prevMaxDataTime);
+
+      long cutoffTime = System.currentTimeMillis() + TimeUnit.DAYS.toMillis(1);
+      String timeClause = PqlUtils.getBetweenClause(new DateTime(0, 
DateTimeZone.UTC), new DateTime(cutoffTime, DateTimeZone.UTC), timeSpec, 
dataset);
+
+      String maxTimePql = String.format(COLLECTION_MAX_TIME_QUERY_TEMPLATE, 
timeSpec.getColumnName(), tableName, timeClause);
       PinotQuery maxTimePinotQuery = new PinotQuery(maxTimePql, tableName);
 
       ThirdEyeResultSetGroup resultSetGroup;
@@ -83,15 +87,17 @@ public long getMaxDateTime(String dataset) {
         LOGGER.error("Failed to get latest max time for dataset {} with PQL: 
{}", tableName, maxTimePinotQuery.getPql());
         this.collectionToPrevMaxDataTimeMap.remove(dataset);
       } else {
+        DateTimeZone timeZone = Utils.getDataTimeZone(dataset);
+
         long endTime = new 
Double(resultSetGroup.get(0).getDouble(0)).longValue();
         this.collectionToPrevMaxDataTimeMap.put(dataset, endTime);
         // endTime + 1 to make sure we cover the time range of that time value.
         String timeFormat = timeSpec.getFormat();
         if (StringUtils.isBlank(timeFormat) || 
TimeSpec.SINCE_EPOCH_FORMAT.equals(timeFormat)) {
-          maxTime = timeSpec.getDataGranularity().toMillis(endTime + 1) - 1;
+          maxTime = timeSpec.getDataGranularity().toMillis(endTime + 1, 
timeZone) - 1;
         } else {
           DateTimeFormatter inputDataDateTimeFormatter =
-              
DateTimeFormat.forPattern(timeFormat).withZone(Utils.getDataTimeZone(dataset));
+              DateTimeFormat.forPattern(timeFormat).withZone(timeZone);
           DateTime endDateTime = DateTime.parse(String.valueOf(endTime), 
inputDataDateTimeFormatter);
           Period oneBucket = datasetConfig.bucketTimeGranularity().toPeriod();
           maxTime = endDateTime.plus(oneBucket).getMillis() - 1;
@@ -106,11 +112,4 @@ public long getMaxDateTime(String dataset) {
     }
     return maxTime;
   }
-
-  private long getPrevMaxDataTime(String collection) {
-    if (this.collectionToPrevMaxDataTimeMap.containsKey(collection)) {
-      return collectionToPrevMaxDataTimeMap.get(collection);
-    }
-    return 0;
-  }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to