[ 
https://issues.apache.org/jira/browse/TAJO-1952?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15033196#comment-15033196
 ] 

ASF GitHub Bot commented on TAJO-1952:
--------------------------------------

Github user hyunsik commented on a diff in the pull request:

    https://github.com/apache/tajo/pull/846#discussion_r46244791
  
    --- Diff: 
tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java
 ---
    @@ -550,6 +551,181 @@ public long getMinSplitSize() {
         return splits;
       }
     
    +  
////////////////////////////////////////////////////////////////////////////////
    +  // The below code is for splitting partitioned table.
    +  
////////////////////////////////////////////////////////////////////////////////
    +
    +  public PartitionFileFragment[] partitionSplit(String tableName, Path 
tablePath, String partitionKeys)
    +    throws IOException {
    +    return partitionSplit(tableName, tablePath, fs.getDefaultBlockSize(), 
partitionKeys);
    +  }
    +
    +  private PartitionFileFragment[] partitionSplit(String tableName, Path 
tablePath, long size, String partitionKeys)
    +    throws IOException {
    +    FileSystem fs = tablePath.getFileSystem(conf);
    +
    +    long defaultBlockSize = size;
    +    List<PartitionFileFragment> listTablets = new ArrayList<>();
    +    PartitionFileFragment tablet;
    +
    +    FileStatus[] fileLists = fs.listStatus(tablePath);
    +    for (FileStatus file : fileLists) {
    +      long remainFileSize = file.getLen();
    +      long start = 0;
    +      if (remainFileSize > defaultBlockSize) {
    +        while (remainFileSize > defaultBlockSize) {
    +          tablet = new PartitionFileFragment(tableName, file.getPath(), 
start, defaultBlockSize, partitionKeys);
    +          listTablets.add(tablet);
    +          start += defaultBlockSize;
    +          remainFileSize -= defaultBlockSize;
    +        }
    +        listTablets.add(new PartitionFileFragment(tableName, 
file.getPath(), start, remainFileSize, partitionKeys));
    +      } else {
    +        listTablets.add(new PartitionFileFragment(tableName, 
file.getPath(), 0, remainFileSize, partitionKeys));
    +      }
    +    }
    +
    +    PartitionFileFragment[] tablets = new 
PartitionFileFragment[listTablets.size()];
    +    listTablets.toArray(tablets);
    +
    +    return tablets;
    +  }
    +
    +  protected PartitionFileFragment makePartitionSplit(String fragmentId, 
Path file, long start, long length,
    +                                                     String[] hosts, 
String partitionKeys) {
    +    return new PartitionFileFragment(fragmentId, file, start, length, 
hosts, partitionKeys);
    +  }
    +
    +  protected PartitionFileFragment makePartitionSplit(String fragmentId, 
Path file, BlockLocation blockLocation
    +    , String partitionKeys) throws IOException {
    +    return new PartitionFileFragment(fragmentId, file, blockLocation, 
partitionKeys);
    +  }
    +
    +  protected Fragment makeNonPartitionSplit(String fragmentId, Path file, 
long start, long length,
    +                                           BlockLocation[] blkLocations, 
String partitionKeys) throws IOException {
    +
    +    Map<String, Integer> hostsBlockMap = new HashMap<>();
    +    for (BlockLocation blockLocation : blkLocations) {
    +      for (String host : blockLocation.getHosts()) {
    +        if (hostsBlockMap.containsKey(host)) {
    +          hostsBlockMap.put(host, hostsBlockMap.get(host) + 1);
    +        } else {
    +          hostsBlockMap.put(host, 1);
    +        }
    +      }
    +    }
    +
    +    List<Map.Entry<String, Integer>> entries = new 
ArrayList<>(hostsBlockMap.entrySet());
    +    Collections.sort(entries, new Comparator<Map.Entry<String, Integer>>() 
{
    +
    +      @Override
    +      public int compare(Map.Entry<String, Integer> v1, Map.Entry<String, 
Integer> v2) {
    +        return v1.getValue().compareTo(v2.getValue());
    +      }
    +    });
    +
    +    String[] hosts = new String[blkLocations[0].getHosts().length];
    +
    +    for (int i = 0; i < hosts.length; i++) {
    +      Map.Entry<String, Integer> entry = entries.get((entries.size() - 1) 
- i);
    +      hosts[i] = entry.getKey();
    +    }
    +
    +    return new PartitionFileFragment(fragmentId, file, start, length, 
hosts, partitionKeys);
    +  }
    +
    +  /**
    +   * Generate the list of files and make them into PartitionedFileSplits.
    +   *
    +   * @throws IOException
    +   */
    +  public List<Fragment> getPartitionSplits(String tableName, TableMeta 
meta, Schema schema, String[] partitionKeys,
    +                                           Path... inputs) throws 
IOException {
    +    // generate splits'
    +
    +    List<Fragment> splits = Lists.newArrayList();
    +    List<Fragment> volumeSplits = Lists.newArrayList();
    +    List<BlockLocation> blockLocations = Lists.newArrayList();
    +
    +    int i = 0;
    +    for (Path p : inputs) {
    +      ArrayList<FileStatus> files = Lists.newArrayList();
    +      if (fs.isFile(p)) {
    +        files.addAll(Lists.newArrayList(fs.getFileStatus(p)));
    +      } else {
    +        files.addAll(listStatus(p));
    +      }
    +
    +      int previousSplitSize = splits.size();
    +      for (FileStatus file : files) {
    +        Path path = file.getPath();
    +        long length = file.getLen();
    +        if (length > 0) {
    +          // Get locations of blocks of file
    +          BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, 
length);
    +          boolean splittable = isSplittable(meta, schema, path, file);
    +          if (blocksMetadataEnabled && fs instanceof 
DistributedFileSystem) {
    +
    +            if (splittable) {
    +              for (BlockLocation blockLocation : blkLocations) {
    +                volumeSplits.add(makePartitionSplit(tableName, path, 
blockLocation, partitionKeys[i]));
    +              }
    +              blockLocations.addAll(Arrays.asList(blkLocations));
    +
    +            } else { // Non splittable
    +              long blockSize = blkLocations[0].getLength();
    +              if (blockSize >= length) {
    +                blockLocations.addAll(Arrays.asList(blkLocations));
    +                for (BlockLocation blockLocation : blkLocations) {
    +                  volumeSplits.add(makePartitionSplit(tableName, path, 
blockLocation, partitionKeys[i]));
    +                }
    +              } else {
    +                splits.add(makeNonPartitionSplit(tableName, path, 0, 
length, blkLocations, partitionKeys[i]));
    +              }
    +            }
    +
    +          } else {
    +            if (splittable) {
    +
    +              long minSize = Math.max(getMinSplitSize(), 1);
    +
    +              long blockSize = file.getBlockSize(); // s3n rest api 
contained block size but blockLocations is one
    +              long splitSize = Math.max(minSize, blockSize);
    +              long bytesRemaining = length;
    +
    +              // for s3
    +              while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) {
    +                int blkIndex = getBlockIndex(blkLocations, length - 
bytesRemaining);
    +
    +                splits.add(makePartitionSplit(tableName, path, length - 
bytesRemaining, splitSize,
    +                  blkLocations[blkIndex].getHosts(), partitionKeys[i]));
    +
    +                bytesRemaining -= splitSize;
    +              }
    +              if (bytesRemaining > 0) {
    +                int blkIndex = getBlockIndex(blkLocations, length - 
bytesRemaining);
    +                splits.add(makePartitionSplit(tableName, path, length - 
bytesRemaining, bytesRemaining,
    +                  blkLocations[blkIndex].getHosts(), partitionKeys[i]));
    +              }
    +            } else { // Non splittable
    +              splits.add(makeNonPartitionSplit(tableName, path, 0, length, 
blkLocations, partitionKeys[i]));
    +            }
    +          }
    +        }
    +      }
    +      if(LOG.isDebugEnabled()){
    +        LOG.debug("# of splits per partition: " + (splits.size() - 
previousSplitSize));
    --- End diff --
    
    The log message is somewhat different from the actual meaning.
    
    ``PreviousSplitSize`` is the number of files in partition directories, and 
split is the number of total fragments. For example, there are two files in all 
partitions, and each file makes 5 fragments. Then, the total number of 
fragments is 10. In that case, 10 - 2 is jut 8. 8 is not the number of splits 
per partition.  The actual answer should be 5. In addition, we can say X per Y 
when X is the same across all Ys. It is hard to get answer because each 
partition file can be vary in terms of size. So, average is more proper here, I 
think.


> Implement PartitionFileFragment
> -------------------------------
>
>                 Key: TAJO-1952
>                 URL: https://issues.apache.org/jira/browse/TAJO-1952
>             Project: Tajo
>          Issue Type: Improvement
>          Components: Planner/Optimizer, Storage
>            Reporter: Jaehwa Jung
>            Assignee: Jaehwa Jung
>             Fix For: 0.12.0, 0.11.1
>
>         Attachments: TAJO-1952.patch
>
>
> Currently, PartitionedTableScanNode contains the list of partitions and it 
> seems to me that the list has some problems as following:
> 1. Duplicate Informs: Task contains Fragment which specify target directory 
> or target file for scanning. A path of partition lists already would write to 
> Fragment. 
> 2. Network Resource: When scanning lost of partition, it will occupy network 
> resource, for example, several hundred kilobytes or more. It looks like an 
> unnecessary resource because Fragment already has the path of partitions.
> I want to improve above problems by implementing new Fragment called 
> PartitionedFileFragment. Currently, I'm planning the implementation as 
> following:
> * PartitionedFileFragment will borrow FileFragment and it contains the 
> partition path and the partition key values.  
> * Remove the path array of partitions from PartitionedTableScanNode. 
> * Implement a method for getting filtered partition directories in 
> FileTableSpace.
> * Implement a method for making PartitionedFileFragment array.
> * Before making splits, call above method and use it for making splits.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to