Github user hyunsik commented on a diff in the pull request:

    https://github.com/apache/tajo/pull/846#discussion_r46244791
  
    --- Diff: 
tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java
 ---
    @@ -550,6 +551,181 @@ public long getMinSplitSize() {
         return splits;
       }
     
    +  
////////////////////////////////////////////////////////////////////////////////
    +  // The below code is for splitting partitioned table.
    +  
////////////////////////////////////////////////////////////////////////////////
    +
    +  public PartitionFileFragment[] partitionSplit(String tableName, Path 
tablePath, String partitionKeys)
    +    throws IOException {
    +    return partitionSplit(tableName, tablePath, fs.getDefaultBlockSize(), 
partitionKeys);
    +  }
    +
    +  private PartitionFileFragment[] partitionSplit(String tableName, Path 
tablePath, long size, String partitionKeys)
    +    throws IOException {
    +    FileSystem fs = tablePath.getFileSystem(conf);
    +
    +    long defaultBlockSize = size;
    +    List<PartitionFileFragment> listTablets = new ArrayList<>();
    +    PartitionFileFragment tablet;
    +
    +    FileStatus[] fileLists = fs.listStatus(tablePath);
    +    for (FileStatus file : fileLists) {
    +      long remainFileSize = file.getLen();
    +      long start = 0;
    +      if (remainFileSize > defaultBlockSize) {
    +        while (remainFileSize > defaultBlockSize) {
    +          tablet = new PartitionFileFragment(tableName, file.getPath(), 
start, defaultBlockSize, partitionKeys);
    +          listTablets.add(tablet);
    +          start += defaultBlockSize;
    +          remainFileSize -= defaultBlockSize;
    +        }
    +        listTablets.add(new PartitionFileFragment(tableName, 
file.getPath(), start, remainFileSize, partitionKeys));
    +      } else {
    +        listTablets.add(new PartitionFileFragment(tableName, 
file.getPath(), 0, remainFileSize, partitionKeys));
    +      }
    +    }
    +
    +    PartitionFileFragment[] tablets = new 
PartitionFileFragment[listTablets.size()];
    +    listTablets.toArray(tablets);
    +
    +    return tablets;
    +  }
    +
    +  protected PartitionFileFragment makePartitionSplit(String fragmentId, 
Path file, long start, long length,
    +                                                     String[] hosts, 
String partitionKeys) {
    +    return new PartitionFileFragment(fragmentId, file, start, length, 
hosts, partitionKeys);
    +  }
    +
    +  protected PartitionFileFragment makePartitionSplit(String fragmentId, 
Path file, BlockLocation blockLocation
    +    , String partitionKeys) throws IOException {
    +    return new PartitionFileFragment(fragmentId, file, blockLocation, 
partitionKeys);
    +  }
    +
    +  protected Fragment makeNonPartitionSplit(String fragmentId, Path file, 
long start, long length,
    +                                           BlockLocation[] blkLocations, 
String partitionKeys) throws IOException {
    +
    +    Map<String, Integer> hostsBlockMap = new HashMap<>();
    +    for (BlockLocation blockLocation : blkLocations) {
    +      for (String host : blockLocation.getHosts()) {
    +        if (hostsBlockMap.containsKey(host)) {
    +          hostsBlockMap.put(host, hostsBlockMap.get(host) + 1);
    +        } else {
    +          hostsBlockMap.put(host, 1);
    +        }
    +      }
    +    }
    +
    +    List<Map.Entry<String, Integer>> entries = new 
ArrayList<>(hostsBlockMap.entrySet());
    +    Collections.sort(entries, new Comparator<Map.Entry<String, Integer>>() 
{
    +
    +      @Override
    +      public int compare(Map.Entry<String, Integer> v1, Map.Entry<String, 
Integer> v2) {
    +        return v1.getValue().compareTo(v2.getValue());
    +      }
    +    });
    +
    +    String[] hosts = new String[blkLocations[0].getHosts().length];
    +
    +    for (int i = 0; i < hosts.length; i++) {
    +      Map.Entry<String, Integer> entry = entries.get((entries.size() - 1) 
- i);
    +      hosts[i] = entry.getKey();
    +    }
    +
    +    return new PartitionFileFragment(fragmentId, file, start, length, 
hosts, partitionKeys);
    +  }
    +
    +  /**
    +   * Generate the list of files and make them into PartitionedFileSplits.
    +   *
    +   * @throws IOException
    +   */
    +  public List<Fragment> getPartitionSplits(String tableName, TableMeta 
meta, Schema schema, String[] partitionKeys,
    +                                           Path... inputs) throws 
IOException {
    +    // generate splits'
    +
    +    List<Fragment> splits = Lists.newArrayList();
    +    List<Fragment> volumeSplits = Lists.newArrayList();
    +    List<BlockLocation> blockLocations = Lists.newArrayList();
    +
    +    int i = 0;
    +    for (Path p : inputs) {
    +      ArrayList<FileStatus> files = Lists.newArrayList();
    +      if (fs.isFile(p)) {
    +        files.addAll(Lists.newArrayList(fs.getFileStatus(p)));
    +      } else {
    +        files.addAll(listStatus(p));
    +      }
    +
    +      int previousSplitSize = splits.size();
    +      for (FileStatus file : files) {
    +        Path path = file.getPath();
    +        long length = file.getLen();
    +        if (length > 0) {
    +          // Get locations of blocks of file
    +          BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, 
length);
    +          boolean splittable = isSplittable(meta, schema, path, file);
    +          if (blocksMetadataEnabled && fs instanceof 
DistributedFileSystem) {
    +
    +            if (splittable) {
    +              for (BlockLocation blockLocation : blkLocations) {
    +                volumeSplits.add(makePartitionSplit(tableName, path, 
blockLocation, partitionKeys[i]));
    +              }
    +              blockLocations.addAll(Arrays.asList(blkLocations));
    +
    +            } else { // Non splittable
    +              long blockSize = blkLocations[0].getLength();
    +              if (blockSize >= length) {
    +                blockLocations.addAll(Arrays.asList(blkLocations));
    +                for (BlockLocation blockLocation : blkLocations) {
    +                  volumeSplits.add(makePartitionSplit(tableName, path, 
blockLocation, partitionKeys[i]));
    +                }
    +              } else {
    +                splits.add(makeNonPartitionSplit(tableName, path, 0, 
length, blkLocations, partitionKeys[i]));
    +              }
    +            }
    +
    +          } else {
    +            if (splittable) {
    +
    +              long minSize = Math.max(getMinSplitSize(), 1);
    +
    +              long blockSize = file.getBlockSize(); // s3n rest api 
contained block size but blockLocations is one
    +              long splitSize = Math.max(minSize, blockSize);
    +              long bytesRemaining = length;
    +
    +              // for s3
    +              while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) {
    +                int blkIndex = getBlockIndex(blkLocations, length - 
bytesRemaining);
    +
    +                splits.add(makePartitionSplit(tableName, path, length - 
bytesRemaining, splitSize,
    +                  blkLocations[blkIndex].getHosts(), partitionKeys[i]));
    +
    +                bytesRemaining -= splitSize;
    +              }
    +              if (bytesRemaining > 0) {
    +                int blkIndex = getBlockIndex(blkLocations, length - 
bytesRemaining);
    +                splits.add(makePartitionSplit(tableName, path, length - 
bytesRemaining, bytesRemaining,
    +                  blkLocations[blkIndex].getHosts(), partitionKeys[i]));
    +              }
    +            } else { // Non splittable
    +              splits.add(makeNonPartitionSplit(tableName, path, 0, length, 
blkLocations, partitionKeys[i]));
    +            }
    +          }
    +        }
    +      }
    +      if(LOG.isDebugEnabled()){
    +        LOG.debug("# of splits per partition: " + (splits.size() - 
previousSplitSize));
    --- End diff --
    
    The log message is somewhat different from the actual meaning.
    
    ``PreviousSplitSize`` is the number of files in partition directories, and 
split is the number of total fragments. For example, there are two files in all 
partitions, and each file makes 5 fragments. Then, the total number of 
fragments is 10. In that case, 10 - 2 is jut 8. 8 is not the number of splits 
per partition.  The actual answer should be 5. In addition, we can say X per Y 
when X is the same across all Ys. It is hard to get answer because each 
partition file can be vary in terms of size. So, average is more proper here, I 
think.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to