[
https://issues.apache.org/jira/browse/TAJO-1493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14900289#comment-14900289
]
ASF GitHub Bot commented on TAJO-1493:
--------------------------------------
Github user jihoonson commented on a diff in the pull request:
https://github.com/apache/tajo/pull/772#discussion_r39944783
--- Diff:
tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java
---
@@ -2228,6 +2244,257 @@ private void setPartitionKeys(int pid,
PartitionDescProto.Builder partitionDesc)
return partitions;
}
+ /**
+ * Check if list of partitions exist on catalog.
+ *
+ *
+ * @param databaseId
+ * @param tableId
+ * @return
+ */
+ public boolean existPartitionsOnCatalog(int tableId) {
+ Connection conn = null;
+ ResultSet res = null;
+ PreparedStatement pstmt = null;
+ boolean result = false;
+
+ try {
+ String sql = "SELECT COUNT(*) CNT FROM "
+ + TB_PARTTIONS +" WHERE " + COL_TABLES_PK + " = ? ";
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(sql);
+ }
+
+ conn = getConnection();
+ pstmt = conn.prepareStatement(sql);
+ pstmt.setInt(1, tableId);
+ res = pstmt.executeQuery();
+
+ if (res.next()) {
+ if (res.getInt("CNT") > 0) {
+ result = true;
+ }
+ }
+ } catch (SQLException se) {
+ throw new TajoInternalError(se);
+ } finally {
+ CatalogUtil.closeQuietly(pstmt, res);
+ }
+ return result;
+ }
+
+ @Override
+ public List<PartitionDescProto>
getPartitionsByFilter(PartitionsByFilterProto request) {
+ throw new TajoRuntimeException(new UnsupportedException());
+ }
+
+ @Override
+ public List<PartitionDescProto>
getPartitionsByAlgebra(PartitionsByAlgebraProto request) throws
+ UndefinedDatabaseException, UndefinedTableException,
UndefinedPartitionMethodException,
+ PartitionNotFoundException, UnsupportedException {
+ Connection conn = null;
+ PreparedStatement pstmt = null;
+ ResultSet res = null;
+ int currentIndex = 1;
+ String selectStatement = null;
+
+ List<PartitionDescProto> partitions = TUtil.newList();
+ List<PartitionFilterSet> filterSets = TUtil.newList();
+
+ int databaseId = getDatabaseId(request.getDatabaseName());
+ int tableId = getTableId(databaseId, request.getDatabaseName(),
request.getTableName());
+ if (!existPartitionMethod(request.getDatabaseName(),
request.getTableName())) {
+ throw new UndefinedPartitionMethodException(request.getTableName());
+ }
+
+ if (!existPartitionsOnCatalog(tableId)) {
+ throw new PartitionNotFoundException(request.getTableName());
+ }
+
+ try {
+ TableDescProto tableDesc = getTable(request.getDatabaseName(),
request.getTableName());
+
+ selectStatement =
getSelectStatementForPartitions(tableDesc.getTableName(),
tableDesc.getPartition()
+ .getExpressionSchema().getFieldsList(), request.getAlgebra(),
filterSets);
+
+ conn = getConnection();
+ pstmt = conn.prepareStatement(selectStatement);
+
+ // Set table id by force because first parameter of all direct sql
is table id
+ pstmt.setInt(currentIndex, tableId);
+ currentIndex++;
+
+ for (PartitionFilterSet filter : filterSets) {
+ // Set table id by force because all filters have table id as
first parameter.
+ pstmt.setInt(currentIndex, tableId);
+ currentIndex++;
+
+ for (Pair<Type, Object> parameter : filter.getParameters()) {
+ switch (parameter.getFirst()) {
+ case BOOLEAN:
+ pstmt.setBoolean(currentIndex,
(Boolean)parameter.getSecond());
+ break;
+ case INT8:
+ pstmt.setLong(currentIndex, (Long) parameter.getSecond());
+ break;
+ case FLOAT8:
+ pstmt.setDouble(currentIndex, (Double)
parameter.getSecond());
+ break;
+ case DATE:
+ pstmt.setDate(currentIndex, (Date) parameter.getSecond());
+ break;
+ case TIMESTAMP:
+ pstmt.setTimestamp(currentIndex, (Timestamp)
parameter.getSecond());
+ break;
+ case TIME:
+ pstmt.setTime(currentIndex, (Time) parameter.getSecond());
+ break;
+ default:
+ pstmt.setString(currentIndex, (String)
parameter.getSecond());
+ break;
+ }
+ currentIndex++;
+ }
+ }
+
+ res = pstmt.executeQuery();
+
+ while (res.next()) {
+ PartitionDescProto.Builder builder =
PartitionDescProto.newBuilder();
+
+ builder.setId(res.getInt(COL_PARTITIONS_PK));
+ builder.setPartitionName(res.getString("PARTITION_NAME"));
+ builder.setPath(res.getString("PATH"));
+ builder.setNumBytes(res.getLong(COL_PARTITION_BYTES));
+ builder.setNumFiles(res.getLong(COL_PARTITION_FILES));
+
+ partitions.add(builder.build());
+ }
+ } catch (TajoException se) {
+ throw new TajoInternalError(se);
+ } catch (SQLException se) {
+ throw new TajoInternalError(se);
+ } finally {
+ CatalogUtil.closeQuietly(pstmt, res);
+ }
+
+ return partitions;
+ }
+
+ /**
+ * Create a select statement and parameters for querying partitions and
partition keys in CatalogStore.
+ *
+ * For example, consider you have a partitioned table for three columns
(i.e., col1, col2, col3).
+ * Assume that an user gives a condition WHERE (col1 ='1' or col1 =
'100') and col3 > 20.
+ * There is no filter condition corresponding to col2.
+ *
+ * Then, the sql would be generated as following:
+ *
+ * SELECT A.PARTITION_ID, A.PARTITION_NAME, A.PATH FROM PARTITIONS A
+ * WHERE A.TID = ?
+ * AND A.PARTITION_ID IN (
+ * SELECT T1.PARTITION_ID FROM PARTITION_KEYS T1
+ * JOIN PARTITION_KEYS T2 ON T1.TID=T2.TID AND T1.PARTITION_ID =
T2.PARTITION_ID AND T2.TID = ?
+ * AND ( T2.COLUMN_NAME = 'col2' AND T2.PARTITION_VALUE IS NOT NULL )
+ * JOIN PARTITION_KEYS T3 ON T1.TID=T3.TID AND T1.PARTITION_ID =
T3.PARTITION_ID AND T3.TID = ?
+ * AND ( T3.COLUMN_NAME = 'col3' AND T3.PARTITION_VALUE > ? )
+ * WHERE T1.TID = ? AND ( T1.COLUMN_NAME = 'col1' AND
T1.PARTITION_VALUE = ? )
+ * OR ( T1.COLUMN_NAME = 'col1' AND T1.PARTITION_VALUE = ? )
+ )
+ *
+ * @param partitionColumns
+ * @param json
+ * @param filterSets
+ * @return
+ * @throws TajoException
+ * @throws SQLException
+ */
+ private String getSelectStatementForPartitions(String tableName,
List<ColumnProto> partitionColumns, String json,
+ List<PartitionFilterSet> filterSets) throws TajoException {
--- End diff --
At least, you should specify that ```filterSets``` is one of the results of
this method.
> Make partition pruning based on catalog informations
> ----------------------------------------------------
>
> Key: TAJO-1493
> URL: https://issues.apache.org/jira/browse/TAJO-1493
> Project: Tajo
> Issue Type: Sub-task
> Components: Catalog, Planner/Optimizer
> Reporter: Jaehwa Jung
> Assignee: Jaehwa Jung
> Fix For: 0.11.0, 0.12.0
>
> Attachments: TAJO-1493.patch, TAJO-1493_2.patch, TAJO-1493_3.patch,
> TAJO-1493_4.patch
>
>
> Currently, PartitionedTableRewriter take a look into partition directories
> for rewriting filter conditions. It get all sub directories of table path
> because catalog doesn’t provide partition directories. But if there are lots
> of sub directories on HDFS, such as, more than 10,000 directories, it might
> be cause overload to NameNode. Thus, CatalogStore need to provide partition
> directories for specified filter conditions. I designed new method to
> CatalogStore as follows:
> * method name: getPartitionsWithConditionFilters
> * first parameter: database name
> * second parameter: table name
> * third parameter: where clause (included target column name and partition
> value)
> * return values:
> List<org.apache.tajo.catalog.proto.CatalogProtos.TablePartitionProto>
> * description: It scan right partition directories on CatalogStore with where
> caluse.
> For examples, users set parameters as following:
> ** first parameter: default
> ** second parameter: table1
> ** third parameter: COLUMN_NAME = 'col1' AND PARTITION_VALUE = '3
> In the previous cases, this method will create select clause as follows.
> {code:xml}
> SELECT DISTINCT A.PATH
> FROM PARTITIONS A, (
> SELECT B.PARTITION_ID
> FROM PARTITION_KEYS B
> WHERE B.PARTITION_ID > 0
> AND (
> COLUMN_NAME = 'col1' AND PARTITION_VALUE = '3'
> )
> ) B
> WHERE A.PARTITION_ID > 0
> AND A.TID = ${table_id}
> AND A.PARTITION_ID = B.PARTITION_ID
> {code}
> At the first time, I considered to use EvalNode instead of where clause. But
> I can’t use it because of recursive related problems between tajo-catalog
> module and tajo-plan module. So, I’ll implement utility class to convert
> EvalNode to SQL.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)