[ 
https://issues.apache.org/jira/browse/TAJO-1952?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15033199#comment-15033199
 ] 

ASF GitHub Bot commented on TAJO-1952:
--------------------------------------

Github user hyunsik commented on a diff in the pull request:

    https://github.com/apache/tajo/pull/846#discussion_r46244839
  
    --- Diff: 
tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java
 ---
    @@ -550,6 +551,181 @@ public long getMinSplitSize() {
         return splits;
       }
     
    +  
////////////////////////////////////////////////////////////////////////////////
    +  // The below code is for splitting partitioned table.
    +  
////////////////////////////////////////////////////////////////////////////////
    +
    +  public PartitionFileFragment[] partitionSplit(String tableName, Path 
tablePath, String partitionKeys)
    +    throws IOException {
    +    return partitionSplit(tableName, tablePath, fs.getDefaultBlockSize(), 
partitionKeys);
    +  }
    +
    +  private PartitionFileFragment[] partitionSplit(String tableName, Path 
tablePath, long size, String partitionKeys)
    +    throws IOException {
    +    FileSystem fs = tablePath.getFileSystem(conf);
    +
    +    long defaultBlockSize = size;
    +    List<PartitionFileFragment> listTablets = new ArrayList<>();
    +    PartitionFileFragment tablet;
    +
    +    FileStatus[] fileLists = fs.listStatus(tablePath);
    +    for (FileStatus file : fileLists) {
    +      long remainFileSize = file.getLen();
    +      long start = 0;
    +      if (remainFileSize > defaultBlockSize) {
    +        while (remainFileSize > defaultBlockSize) {
    +          tablet = new PartitionFileFragment(tableName, file.getPath(), 
start, defaultBlockSize, partitionKeys);
    +          listTablets.add(tablet);
    +          start += defaultBlockSize;
    +          remainFileSize -= defaultBlockSize;
    +        }
    +        listTablets.add(new PartitionFileFragment(tableName, 
file.getPath(), start, remainFileSize, partitionKeys));
    +      } else {
    +        listTablets.add(new PartitionFileFragment(tableName, 
file.getPath(), 0, remainFileSize, partitionKeys));
    +      }
    +    }
    +
    +    PartitionFileFragment[] tablets = new 
PartitionFileFragment[listTablets.size()];
    +    listTablets.toArray(tablets);
    +
    +    return tablets;
    +  }
    +
    +  protected PartitionFileFragment makePartitionSplit(String fragmentId, 
Path file, long start, long length,
    +                                                     String[] hosts, 
String partitionKeys) {
    +    return new PartitionFileFragment(fragmentId, file, start, length, 
hosts, partitionKeys);
    +  }
    +
    +  protected PartitionFileFragment makePartitionSplit(String fragmentId, 
Path file, BlockLocation blockLocation
    +    , String partitionKeys) throws IOException {
    +    return new PartitionFileFragment(fragmentId, file, blockLocation, 
partitionKeys);
    +  }
    +
    +  protected Fragment makeNonPartitionSplit(String fragmentId, Path file, 
long start, long length,
    +                                           BlockLocation[] blkLocations, 
String partitionKeys) throws IOException {
    +
    +    Map<String, Integer> hostsBlockMap = new HashMap<>();
    +    for (BlockLocation blockLocation : blkLocations) {
    +      for (String host : blockLocation.getHosts()) {
    +        if (hostsBlockMap.containsKey(host)) {
    +          hostsBlockMap.put(host, hostsBlockMap.get(host) + 1);
    +        } else {
    +          hostsBlockMap.put(host, 1);
    +        }
    +      }
    +    }
    +
    +    List<Map.Entry<String, Integer>> entries = new 
ArrayList<>(hostsBlockMap.entrySet());
    +    Collections.sort(entries, new Comparator<Map.Entry<String, Integer>>() 
{
    +
    +      @Override
    +      public int compare(Map.Entry<String, Integer> v1, Map.Entry<String, 
Integer> v2) {
    +        return v1.getValue().compareTo(v2.getValue());
    +      }
    +    });
    +
    +    String[] hosts = new String[blkLocations[0].getHosts().length];
    +
    +    for (int i = 0; i < hosts.length; i++) {
    +      Map.Entry<String, Integer> entry = entries.get((entries.size() - 1) 
- i);
    +      hosts[i] = entry.getKey();
    +    }
    +
    +    return new PartitionFileFragment(fragmentId, file, start, length, 
hosts, partitionKeys);
    +  }
    +
    +  /**
    +   * Generate the list of files and make them into PartitionedFileSplits.
    +   *
    +   * @throws IOException
    +   */
    +  public List<Fragment> getPartitionSplits(String tableName, TableMeta 
meta, Schema schema, String[] partitionKeys,
    +                                           Path... inputs) throws 
IOException {
    +    // generate splits'
    +
    +    List<Fragment> splits = Lists.newArrayList();
    +    List<Fragment> volumeSplits = Lists.newArrayList();
    +    List<BlockLocation> blockLocations = Lists.newArrayList();
    +
    +    int i = 0;
    +    for (Path p : inputs) {
    +      ArrayList<FileStatus> files = Lists.newArrayList();
    +      if (fs.isFile(p)) {
    +        files.addAll(Lists.newArrayList(fs.getFileStatus(p)));
    +      } else {
    +        files.addAll(listStatus(p));
    +      }
    +
    +      int previousSplitSize = splits.size();
    --- End diff --
    
    It would be better if the ```final``` marked for ``previousSplitSize``.


> Implement PartitionFileFragment
> -------------------------------
>
>                 Key: TAJO-1952
>                 URL: https://issues.apache.org/jira/browse/TAJO-1952
>             Project: Tajo
>          Issue Type: Improvement
>          Components: Planner/Optimizer, Storage
>            Reporter: Jaehwa Jung
>            Assignee: Jaehwa Jung
>             Fix For: 0.12.0, 0.11.1
>
>         Attachments: TAJO-1952.patch
>
>
> Currently, PartitionedTableScanNode contains the list of partitions and it 
> seems to me that the list has some problems as following:
> 1. Duplicate Informs: Task contains Fragment which specify target directory 
> or target file for scanning. A path of partition lists already would write to 
> Fragment. 
> 2. Network Resource: When scanning lost of partition, it will occupy network 
> resource, for example, several hundred kilobytes or more. It looks like an 
> unnecessary resource because Fragment already has the path of partitions.
> I want to improve above problems by implementing new Fragment called 
> PartitionedFileFragment. Currently, I'm planning the implementation as 
> following:
> * PartitionedFileFragment will borrow FileFragment and it contains the 
> partition path and the partition key values.  
> * Remove the path array of partitions from PartitionedTableScanNode. 
> * Implement a method for getting filtered partition directories in 
> FileTableSpace.
> * Implement a method for making PartitionedFileFragment array.
> * Before making splits, call above method and use it for making splits.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to