[ 
https://issues.apache.org/jira/browse/TAJO-1493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14900289#comment-14900289
 ] 

ASF GitHub Bot commented on TAJO-1493:
--------------------------------------

Github user jihoonson commented on a diff in the pull request:

    https://github.com/apache/tajo/pull/772#discussion_r39944783
  
    --- Diff: 
tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java
 ---
    @@ -2228,6 +2244,257 @@ private void setPartitionKeys(int pid, 
PartitionDescProto.Builder partitionDesc)
         return partitions;
       }
     
    +  /**
    +   * Check if list of partitions exist on catalog.
    +   *
    +   *
    +   * @param databaseId
    +   * @param tableId
    +   * @return
    +   */
    +  public boolean existPartitionsOnCatalog(int tableId) {
    +    Connection conn = null;
    +    ResultSet res = null;
    +    PreparedStatement pstmt = null;
    +    boolean result = false;
    +
    +    try {
    +      String sql = "SELECT COUNT(*) CNT FROM "
    +        + TB_PARTTIONS +" WHERE " + COL_TABLES_PK + " = ?  ";
    +
    +      if (LOG.isDebugEnabled()) {
    +        LOG.debug(sql);
    +      }
    +
    +      conn = getConnection();
    +      pstmt = conn.prepareStatement(sql);
    +      pstmt.setInt(1, tableId);
    +      res = pstmt.executeQuery();
    +
    +      if (res.next()) {
    +        if (res.getInt("CNT") > 0) {
    +          result = true;
    +        }
    +      }
    +    } catch (SQLException se) {
    +      throw new TajoInternalError(se);
    +    } finally {
    +      CatalogUtil.closeQuietly(pstmt, res);
    +    }
    +    return result;
    +  }
    +
    +  @Override
    +  public List<PartitionDescProto> 
getPartitionsByFilter(PartitionsByFilterProto request) {
    +    throw new TajoRuntimeException(new UnsupportedException());
    +  }
    +
    +  @Override
    +  public List<PartitionDescProto> 
getPartitionsByAlgebra(PartitionsByAlgebraProto request) throws
    +      UndefinedDatabaseException, UndefinedTableException, 
UndefinedPartitionMethodException,
    +      PartitionNotFoundException, UnsupportedException {
    +    Connection conn = null;
    +    PreparedStatement pstmt = null;
    +    ResultSet res = null;
    +    int currentIndex = 1;
    +    String selectStatement = null;
    +
    +    List<PartitionDescProto> partitions = TUtil.newList();
    +    List<PartitionFilterSet> filterSets = TUtil.newList();
    +
    +    int databaseId = getDatabaseId(request.getDatabaseName());
    +    int tableId = getTableId(databaseId, request.getDatabaseName(), 
request.getTableName());
    +    if (!existPartitionMethod(request.getDatabaseName(), 
request.getTableName())) {
    +      throw new UndefinedPartitionMethodException(request.getTableName());
    +    }
    +
    +    if (!existPartitionsOnCatalog(tableId)) {
    +      throw new PartitionNotFoundException(request.getTableName());
    +    }
    +
    +    try {
    +      TableDescProto tableDesc = getTable(request.getDatabaseName(), 
request.getTableName());
    +
    +      selectStatement = 
getSelectStatementForPartitions(tableDesc.getTableName(), 
tableDesc.getPartition()
    +        .getExpressionSchema().getFieldsList(), request.getAlgebra(), 
filterSets);
    +
    +      conn = getConnection();
    +      pstmt = conn.prepareStatement(selectStatement);
    +
    +      // Set table id by force because first parameter of all direct sql 
is table id
    +      pstmt.setInt(currentIndex, tableId);
    +      currentIndex++;
    +
    +      for (PartitionFilterSet filter : filterSets) {
    +        // Set table id by force because all filters have table id as 
first parameter.
    +        pstmt.setInt(currentIndex, tableId);
    +        currentIndex++;
    +
    +        for (Pair<Type, Object> parameter : filter.getParameters()) {
    +          switch (parameter.getFirst()) {
    +            case BOOLEAN:
    +              pstmt.setBoolean(currentIndex, 
(Boolean)parameter.getSecond());
    +              break;
    +            case INT8:
    +              pstmt.setLong(currentIndex, (Long) parameter.getSecond());
    +              break;
    +            case FLOAT8:
    +              pstmt.setDouble(currentIndex, (Double) 
parameter.getSecond());
    +              break;
    +            case DATE:
    +              pstmt.setDate(currentIndex, (Date) parameter.getSecond());
    +              break;
    +            case TIMESTAMP:
    +              pstmt.setTimestamp(currentIndex, (Timestamp) 
parameter.getSecond());
    +              break;
    +            case TIME:
    +              pstmt.setTime(currentIndex, (Time) parameter.getSecond());
    +              break;
    +            default:
    +              pstmt.setString(currentIndex, (String) 
parameter.getSecond());
    +              break;
    +          }
    +          currentIndex++;
    +        }
    +      }
    +
    +      res = pstmt.executeQuery();
    +
    +      while (res.next()) {
    +        PartitionDescProto.Builder builder = 
PartitionDescProto.newBuilder();
    +
    +        builder.setId(res.getInt(COL_PARTITIONS_PK));
    +        builder.setPartitionName(res.getString("PARTITION_NAME"));
    +        builder.setPath(res.getString("PATH"));
    +        builder.setNumBytes(res.getLong(COL_PARTITION_BYTES));
    +        builder.setNumFiles(res.getLong(COL_PARTITION_FILES));
    +
    +        partitions.add(builder.build());
    +      }
    +    } catch (TajoException se) {
    +      throw new TajoInternalError(se);
    +    } catch (SQLException se) {
    +      throw new TajoInternalError(se);
    +    } finally {
    +      CatalogUtil.closeQuietly(pstmt, res);
    +    }
    +
    +    return partitions;
    +  }
    +
    +  /**
    +   * Create a select statement and parameters for querying partitions and 
partition keys in CatalogStore.
    +   *
    +   * For example, consider you have a partitioned table for three columns 
(i.e., col1, col2, col3).
    +   * Assume that an user gives a condition WHERE (col1 ='1' or col1 = 
'100') and col3 > 20.
    +   * There is no filter condition corresponding to col2.
    +   *
    +   * Then, the sql would be generated as following:
    +   *
    +   *  SELECT A.PARTITION_ID, A.PARTITION_NAME, A.PATH FROM PARTITIONS A
    +   *  WHERE A.TID = ?
    +   *  AND A.PARTITION_ID IN (
    +   *    SELECT T1.PARTITION_ID FROM PARTITION_KEYS T1
    +   *    JOIN PARTITION_KEYS T2 ON T1.TID=T2.TID AND T1.PARTITION_ID = 
T2.PARTITION_ID AND T2.TID = ?
    +   *    AND ( T2.COLUMN_NAME = 'col2' AND T2.PARTITION_VALUE IS NOT NULL )
    +   *    JOIN PARTITION_KEYS T3 ON T1.TID=T3.TID AND T1.PARTITION_ID = 
T3.PARTITION_ID AND T3.TID = ?
    +   *    AND ( T3.COLUMN_NAME = 'col3' AND T3.PARTITION_VALUE > ? )
    +   *    WHERE T1.TID = ? AND ( T1.COLUMN_NAME = 'col1' AND 
T1.PARTITION_VALUE = ? )
    +   *    OR ( T1.COLUMN_NAME = 'col1' AND T1.PARTITION_VALUE = ? )
    +   )
    +   *
    +   * @param partitionColumns
    +   * @param json
    +   * @param filterSets
    +   * @return
    +   * @throws TajoException
    +   * @throws SQLException
    +   */
    +  private String getSelectStatementForPartitions(String tableName, 
List<ColumnProto> partitionColumns, String json,
    +    List<PartitionFilterSet> filterSets) throws TajoException {
    --- End diff --
    
    At least, you should specify that ```filterSets``` is one of the results of 
this method.


> Make partition pruning based on catalog informations
> ----------------------------------------------------
>
>                 Key: TAJO-1493
>                 URL: https://issues.apache.org/jira/browse/TAJO-1493
>             Project: Tajo
>          Issue Type: Sub-task
>          Components: Catalog, Planner/Optimizer
>            Reporter: Jaehwa Jung
>            Assignee: Jaehwa Jung
>             Fix For: 0.11.0, 0.12.0
>
>         Attachments: TAJO-1493.patch, TAJO-1493_2.patch, TAJO-1493_3.patch, 
> TAJO-1493_4.patch
>
>
> Currently, PartitionedTableRewriter take a look into partition directories 
> for rewriting filter conditions. It get all sub directories of table path 
> because catalog doesn’t provide partition directories. But if there are lots 
> of sub directories on HDFS, such as, more than 10,000 directories, it might 
> be cause overload to NameNode. Thus, CatalogStore need to provide partition 
> directories for specified filter conditions. I designed new method to 
> CatalogStore as follows:
> * method name: getPartitionsWithConditionFilters
> * first parameter: database name
> * second parameter: table name
> * third parameter: where clause (included target column name and partition 
> value)
> * return values: 
> List<org.apache.tajo.catalog.proto.CatalogProtos.TablePartitionProto>
> * description: It scan right partition directories on CatalogStore with where 
> caluse. 
>   For examples, users set parameters as following:
> ** first parameter: default
> ** second parameter: table1
> ** third parameter: COLUMN_NAME = 'col1' AND PARTITION_VALUE = '3
> In the previous cases, this method will create select clause as follows.
> {code:xml}
> SELECT DISTINCT A.PATH
> FROM PARTITIONS A, (
>   SELECT B.PARTITION_ID
>   FROM PARTITION_KEYS B
>   WHERE B.PARTITION_ID > 0 
>   AND (
>     COLUMN_NAME = 'col1' AND PARTITION_VALUE = '3'
>   )
> ) B
> WHERE A.PARTITION_ID > 0
> AND A.TID = ${table_id}
> AND A.PARTITION_ID = B.PARTITION_ID
> {code}
> At the first time, I considered to use EvalNode instead of where clause. But 
> I can’t use it because of recursive related problems between tajo-catalog 
> module and tajo-plan module. So, I’ll implement utility class to convert 
> EvalNode to SQL.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to