Github user jihoonson commented on a diff in the pull request:

    https://github.com/apache/tajo/pull/772#discussion_r39944783
  
    --- Diff: 
tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java
 ---
    @@ -2228,6 +2244,257 @@ private void setPartitionKeys(int pid, 
PartitionDescProto.Builder partitionDesc)
         return partitions;
       }
     
    +  /**
    +   * Check if list of partitions exist on catalog.
    +   *
    +   *
    +   * @param databaseId
    +   * @param tableId
    +   * @return
    +   */
    +  public boolean existPartitionsOnCatalog(int tableId) {
    +    Connection conn = null;
    +    ResultSet res = null;
    +    PreparedStatement pstmt = null;
    +    boolean result = false;
    +
    +    try {
    +      String sql = "SELECT COUNT(*) CNT FROM "
    +        + TB_PARTTIONS +" WHERE " + COL_TABLES_PK + " = ?  ";
    +
    +      if (LOG.isDebugEnabled()) {
    +        LOG.debug(sql);
    +      }
    +
    +      conn = getConnection();
    +      pstmt = conn.prepareStatement(sql);
    +      pstmt.setInt(1, tableId);
    +      res = pstmt.executeQuery();
    +
    +      if (res.next()) {
    +        if (res.getInt("CNT") > 0) {
    +          result = true;
    +        }
    +      }
    +    } catch (SQLException se) {
    +      throw new TajoInternalError(se);
    +    } finally {
    +      CatalogUtil.closeQuietly(pstmt, res);
    +    }
    +    return result;
    +  }
    +
    +  @Override
    +  public List<PartitionDescProto> 
getPartitionsByFilter(PartitionsByFilterProto request) {
    +    throw new TajoRuntimeException(new UnsupportedException());
    +  }
    +
    +  @Override
    +  public List<PartitionDescProto> 
getPartitionsByAlgebra(PartitionsByAlgebraProto request) throws
    +      UndefinedDatabaseException, UndefinedTableException, 
UndefinedPartitionMethodException,
    +      PartitionNotFoundException, UnsupportedException {
    +    Connection conn = null;
    +    PreparedStatement pstmt = null;
    +    ResultSet res = null;
    +    int currentIndex = 1;
    +    String selectStatement = null;
    +
    +    List<PartitionDescProto> partitions = TUtil.newList();
    +    List<PartitionFilterSet> filterSets = TUtil.newList();
    +
    +    int databaseId = getDatabaseId(request.getDatabaseName());
    +    int tableId = getTableId(databaseId, request.getDatabaseName(), 
request.getTableName());
    +    if (!existPartitionMethod(request.getDatabaseName(), 
request.getTableName())) {
    +      throw new UndefinedPartitionMethodException(request.getTableName());
    +    }
    +
    +    if (!existPartitionsOnCatalog(tableId)) {
    +      throw new PartitionNotFoundException(request.getTableName());
    +    }
    +
    +    try {
    +      TableDescProto tableDesc = getTable(request.getDatabaseName(), 
request.getTableName());
    +
    +      selectStatement = 
getSelectStatementForPartitions(tableDesc.getTableName(), 
tableDesc.getPartition()
    +        .getExpressionSchema().getFieldsList(), request.getAlgebra(), 
filterSets);
    +
    +      conn = getConnection();
    +      pstmt = conn.prepareStatement(selectStatement);
    +
    +      // Set table id by force because first parameter of all direct sql 
is table id
    +      pstmt.setInt(currentIndex, tableId);
    +      currentIndex++;
    +
    +      for (PartitionFilterSet filter : filterSets) {
    +        // Set table id by force because all filters have table id as 
first parameter.
    +        pstmt.setInt(currentIndex, tableId);
    +        currentIndex++;
    +
    +        for (Pair<Type, Object> parameter : filter.getParameters()) {
    +          switch (parameter.getFirst()) {
    +            case BOOLEAN:
    +              pstmt.setBoolean(currentIndex, 
(Boolean)parameter.getSecond());
    +              break;
    +            case INT8:
    +              pstmt.setLong(currentIndex, (Long) parameter.getSecond());
    +              break;
    +            case FLOAT8:
    +              pstmt.setDouble(currentIndex, (Double) 
parameter.getSecond());
    +              break;
    +            case DATE:
    +              pstmt.setDate(currentIndex, (Date) parameter.getSecond());
    +              break;
    +            case TIMESTAMP:
    +              pstmt.setTimestamp(currentIndex, (Timestamp) 
parameter.getSecond());
    +              break;
    +            case TIME:
    +              pstmt.setTime(currentIndex, (Time) parameter.getSecond());
    +              break;
    +            default:
    +              pstmt.setString(currentIndex, (String) 
parameter.getSecond());
    +              break;
    +          }
    +          currentIndex++;
    +        }
    +      }
    +
    +      res = pstmt.executeQuery();
    +
    +      while (res.next()) {
    +        PartitionDescProto.Builder builder = 
PartitionDescProto.newBuilder();
    +
    +        builder.setId(res.getInt(COL_PARTITIONS_PK));
    +        builder.setPartitionName(res.getString("PARTITION_NAME"));
    +        builder.setPath(res.getString("PATH"));
    +        builder.setNumBytes(res.getLong(COL_PARTITION_BYTES));
    +        builder.setNumFiles(res.getLong(COL_PARTITION_FILES));
    +
    +        partitions.add(builder.build());
    +      }
    +    } catch (TajoException se) {
    +      throw new TajoInternalError(se);
    +    } catch (SQLException se) {
    +      throw new TajoInternalError(se);
    +    } finally {
    +      CatalogUtil.closeQuietly(pstmt, res);
    +    }
    +
    +    return partitions;
    +  }
    +
    +  /**
    +   * Create a select statement and parameters for querying partitions and 
partition keys in CatalogStore.
    +   *
    +   * For example, consider you have a partitioned table for three columns 
(i.e., col1, col2, col3).
    +   * Assume that an user gives a condition WHERE (col1 ='1' or col1 = 
'100') and col3 > 20.
    +   * There is no filter condition corresponding to col2.
    +   *
    +   * Then, the sql would be generated as following:
    +   *
    +   *  SELECT A.PARTITION_ID, A.PARTITION_NAME, A.PATH FROM PARTITIONS A
    +   *  WHERE A.TID = ?
    +   *  AND A.PARTITION_ID IN (
    +   *    SELECT T1.PARTITION_ID FROM PARTITION_KEYS T1
    +   *    JOIN PARTITION_KEYS T2 ON T1.TID=T2.TID AND T1.PARTITION_ID = 
T2.PARTITION_ID AND T2.TID = ?
    +   *    AND ( T2.COLUMN_NAME = 'col2' AND T2.PARTITION_VALUE IS NOT NULL )
    +   *    JOIN PARTITION_KEYS T3 ON T1.TID=T3.TID AND T1.PARTITION_ID = 
T3.PARTITION_ID AND T3.TID = ?
    +   *    AND ( T3.COLUMN_NAME = 'col3' AND T3.PARTITION_VALUE > ? )
    +   *    WHERE T1.TID = ? AND ( T1.COLUMN_NAME = 'col1' AND 
T1.PARTITION_VALUE = ? )
    +   *    OR ( T1.COLUMN_NAME = 'col1' AND T1.PARTITION_VALUE = ? )
    +   )
    +   *
    +   * @param partitionColumns
    +   * @param json
    +   * @param filterSets
    +   * @return
    +   * @throws TajoException
    +   * @throws SQLException
    +   */
    +  private String getSelectStatementForPartitions(String tableName, 
List<ColumnProto> partitionColumns, String json,
    +    List<PartitionFilterSet> filterSets) throws TajoException {
    --- End diff --
    
    At least, you should specify that ```filterSets``` is one of the results of 
this method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to