tangdian commented on a change in pull request #3830: [TE] SQL Connector 
backend and front end, supporting Presto, MySQL, H2, with sample data in H2
URL: https://github.com/apache/incubator-pinot/pull/3830#discussion_r271828177
 
 

 ##########
 File path: 
thirdeye/thirdeye-pinot/src/main/java/org/apache/pinot/thirdeye/datasource/sql/SqlThirdEyeDataSource.java
 ##########
 @@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.thirdeye.datasource.sql;
+
+import com.google.common.base.Preconditions;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import com.google.common.cache.Weigher;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Scanner;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.thirdeye.common.time.TimeSpec;
+import org.apache.pinot.thirdeye.datalayer.dto.DatasetConfigDTO;
+import org.apache.pinot.thirdeye.datasource.MetricFunction;
+import org.apache.pinot.thirdeye.datasource.RelationalThirdEyeResponse;
+import org.apache.pinot.thirdeye.datasource.ThirdEyeResponse;
+import org.apache.pinot.thirdeye.datasource.ThirdEyeCacheRegistry;
+import org.apache.pinot.thirdeye.datasource.ThirdEyeDataSource;
+import org.apache.pinot.thirdeye.datasource.ThirdEyeRequest;
+import org.apache.pinot.thirdeye.datasource.pinot.resultset.ThirdEyeResultSet;
+import 
org.apache.pinot.thirdeye.datasource.pinot.resultset.ThirdEyeResultSetUtils;
+import org.apache.pinot.thirdeye.util.ThirdEyeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class SqlThirdEyeDataSource implements ThirdEyeDataSource {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SqlThirdEyeDataSource.class);
+  private static final ThirdEyeCacheRegistry CACHE_REGISTRY_INSTANCE = 
ThirdEyeCacheRegistry.getInstance();
+  protected LoadingCache<SqlQuery, ThirdEyeResultSet> sqlResponseCache;
+  private SqlResponseCacheLoader sqlResponseCacheLoader;
+  public static final String DATA_SOURCE_NAME = 
SqlThirdEyeDataSource.class.getSimpleName();
+
+  // TODO: make default cache size configurable
+  private static final int DEFAULT_HEAP_PERCENTAGE_FOR_RESULTSETGROUP_CACHE = 
50;
+  private static final int 
DEFAULT_LOWER_BOUND_OF_RESULTSETGROUP_CACHE_SIZE_IN_MB = 100;
+  private static final int 
DEFAULT_UPPER_BOUND_OF_RESULTSETGROUP_CACHE_SIZE_IN_MB = 8192;
+
+  public SqlThirdEyeDataSource(Map<String, Object> properties) throws 
Exception {
+    sqlResponseCacheLoader = new SqlResponseCacheLoader(properties);
+    sqlResponseCache = buildResponseCache(sqlResponseCacheLoader);
+  }
+
+  @Override
+  public String getName() {
+    return SqlThirdEyeDataSource.class.getSimpleName();
+  }
+
+  @Override
+  public ThirdEyeResponse execute(ThirdEyeRequest request) throws Exception {
+    LinkedHashMap<MetricFunction, List<ThirdEyeResultSet>> 
metricFunctionToResultSetList = new LinkedHashMap<>();
+
+    TimeSpec timeSpec = null;
+    try {
+      for (MetricFunction metricFunction : request.getMetricFunctions()) {
+        String dataset = metricFunction.getDataset();
+        DatasetConfigDTO datasetConfig = 
ThirdEyeUtils.getDatasetConfigFromName(dataset);
+        TimeSpec dataTimeSpec = 
ThirdEyeUtils.getTimestampTimeSpecFromDatasetConfig(datasetConfig);
+
+        if (timeSpec == null) {
+          timeSpec = dataTimeSpec;
+        }
+
+        String[] tableComponents = dataset.split("\\.");
+        String sourceName = tableComponents[0];
+        String dbName = tableComponents[1];
+
+        String sqlQuery = SqlUtils.getSql(request, metricFunction, 
request.getFilterSet(), dataTimeSpec, sourceName);
+        ThirdEyeResultSet thirdEyeResultSet = executeSQL(new 
SqlQuery(sqlQuery, sourceName, dbName,
+            metricFunction.getMetricName(), request.getGroupBy(), 
request.getGroupByTimeGranularity(), dataTimeSpec));
+
+        List<ThirdEyeResultSet> resultSetList = new ArrayList<>();
+        resultSetList.add(thirdEyeResultSet);
+
+        metricFunctionToResultSetList.put(metricFunction, resultSetList);
+
+      }
+      List<String[]> resultRows = 
ThirdEyeResultSetUtils.parseResultSets(request, metricFunctionToResultSetList, 
"SQL");
+
+      return new RelationalThirdEyeResponse(request, resultRows, timeSpec);
+    } catch (Exception e) {
+      throw e;
+    }
+
+  }
+
+  /**
+   * Returns the cached ResultSetGroup corresponding to the given Presto query.
+   *
+   * @param SQLQuery the query that is specifically constructed for Presto.
+   * @return the corresponding ThirdEyeResultSet to the given Presto query.
+   *
+   */
+  private ThirdEyeResultSet executeSQL(SqlQuery SQLQuery) throws Exception {
+    ThirdEyeResultSet thirdEyeResultSet;
+    try {
+      thirdEyeResultSet = sqlResponseCache.get(SQLQuery);
+    } catch (Exception e) {
+      throw e;
+    }
+    return thirdEyeResultSet;
+  }
+
+  @Override
+  public List<String> getDatasets() throws Exception {
+    return CACHE_REGISTRY_INSTANCE.getDatasetsCache().getDatasets();
+  }
+
+  @Override
+  public void clear() throws Exception {
+    // left blank
+  }
+
+  @Override
+  public void close() throws Exception {
+    // left blank
+  }
+
+  @Override
+  public long getMaxDataTime(String dataset) throws Exception {
+    LOG.info("Getting max data time for " + dataset);
+    return sqlResponseCacheLoader.getMaxDataTime(dataset);
+  }
+
+  @Override
+  public Map<String, List<String>> getDimensionFilters(String dataset) throws 
Exception {
+    LOG.info("Running dimension filters for " + dataset);
+    return this.sqlResponseCacheLoader.getDimensionFilters(dataset);
+  }
+
+
+  private static LoadingCache<SqlQuery, ThirdEyeResultSet> buildResponseCache(
 
 Review comment:
   Thanks for the suggestion. So I have refactored and moved buildResponseCache 
function into ThirdEyeUtils. (I had to create a superclass called 
RelationalQuery for SqlQuery and PinotQuery to extend to reuse this function.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to