[
https://issues.apache.org/jira/browse/TAJO-1952?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15033196#comment-15033196
]
ASF GitHub Bot commented on TAJO-1952:
--------------------------------------
Github user hyunsik commented on a diff in the pull request:
https://github.com/apache/tajo/pull/846#discussion_r46244791
--- Diff:
tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java
---
@@ -550,6 +551,181 @@ public long getMinSplitSize() {
return splits;
}
+
////////////////////////////////////////////////////////////////////////////////
+ // The below code is for splitting partitioned table.
+
////////////////////////////////////////////////////////////////////////////////
+
+ public PartitionFileFragment[] partitionSplit(String tableName, Path
tablePath, String partitionKeys)
+ throws IOException {
+ return partitionSplit(tableName, tablePath, fs.getDefaultBlockSize(),
partitionKeys);
+ }
+
+ private PartitionFileFragment[] partitionSplit(String tableName, Path
tablePath, long size, String partitionKeys)
+ throws IOException {
+ FileSystem fs = tablePath.getFileSystem(conf);
+
+ long defaultBlockSize = size;
+ List<PartitionFileFragment> listTablets = new ArrayList<>();
+ PartitionFileFragment tablet;
+
+ FileStatus[] fileLists = fs.listStatus(tablePath);
+ for (FileStatus file : fileLists) {
+ long remainFileSize = file.getLen();
+ long start = 0;
+ if (remainFileSize > defaultBlockSize) {
+ while (remainFileSize > defaultBlockSize) {
+ tablet = new PartitionFileFragment(tableName, file.getPath(),
start, defaultBlockSize, partitionKeys);
+ listTablets.add(tablet);
+ start += defaultBlockSize;
+ remainFileSize -= defaultBlockSize;
+ }
+ listTablets.add(new PartitionFileFragment(tableName,
file.getPath(), start, remainFileSize, partitionKeys));
+ } else {
+ listTablets.add(new PartitionFileFragment(tableName,
file.getPath(), 0, remainFileSize, partitionKeys));
+ }
+ }
+
+ PartitionFileFragment[] tablets = new
PartitionFileFragment[listTablets.size()];
+ listTablets.toArray(tablets);
+
+ return tablets;
+ }
+
+ protected PartitionFileFragment makePartitionSplit(String fragmentId,
Path file, long start, long length,
+ String[] hosts,
String partitionKeys) {
+ return new PartitionFileFragment(fragmentId, file, start, length,
hosts, partitionKeys);
+ }
+
+ protected PartitionFileFragment makePartitionSplit(String fragmentId,
Path file, BlockLocation blockLocation
+ , String partitionKeys) throws IOException {
+ return new PartitionFileFragment(fragmentId, file, blockLocation,
partitionKeys);
+ }
+
+ protected Fragment makeNonPartitionSplit(String fragmentId, Path file,
long start, long length,
+ BlockLocation[] blkLocations,
String partitionKeys) throws IOException {
+
+ Map<String, Integer> hostsBlockMap = new HashMap<>();
+ for (BlockLocation blockLocation : blkLocations) {
+ for (String host : blockLocation.getHosts()) {
+ if (hostsBlockMap.containsKey(host)) {
+ hostsBlockMap.put(host, hostsBlockMap.get(host) + 1);
+ } else {
+ hostsBlockMap.put(host, 1);
+ }
+ }
+ }
+
+ List<Map.Entry<String, Integer>> entries = new
ArrayList<>(hostsBlockMap.entrySet());
+ Collections.sort(entries, new Comparator<Map.Entry<String, Integer>>()
{
+
+ @Override
+ public int compare(Map.Entry<String, Integer> v1, Map.Entry<String,
Integer> v2) {
+ return v1.getValue().compareTo(v2.getValue());
+ }
+ });
+
+ String[] hosts = new String[blkLocations[0].getHosts().length];
+
+ for (int i = 0; i < hosts.length; i++) {
+ Map.Entry<String, Integer> entry = entries.get((entries.size() - 1)
- i);
+ hosts[i] = entry.getKey();
+ }
+
+ return new PartitionFileFragment(fragmentId, file, start, length,
hosts, partitionKeys);
+ }
+
+ /**
+ * Generate the list of files and make them into PartitionedFileSplits.
+ *
+ * @throws IOException
+ */
+ public List<Fragment> getPartitionSplits(String tableName, TableMeta
meta, Schema schema, String[] partitionKeys,
+ Path... inputs) throws
IOException {
+ // generate splits'
+
+ List<Fragment> splits = Lists.newArrayList();
+ List<Fragment> volumeSplits = Lists.newArrayList();
+ List<BlockLocation> blockLocations = Lists.newArrayList();
+
+ int i = 0;
+ for (Path p : inputs) {
+ ArrayList<FileStatus> files = Lists.newArrayList();
+ if (fs.isFile(p)) {
+ files.addAll(Lists.newArrayList(fs.getFileStatus(p)));
+ } else {
+ files.addAll(listStatus(p));
+ }
+
+ int previousSplitSize = splits.size();
+ for (FileStatus file : files) {
+ Path path = file.getPath();
+ long length = file.getLen();
+ if (length > 0) {
+ // Get locations of blocks of file
+ BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0,
length);
+ boolean splittable = isSplittable(meta, schema, path, file);
+ if (blocksMetadataEnabled && fs instanceof
DistributedFileSystem) {
+
+ if (splittable) {
+ for (BlockLocation blockLocation : blkLocations) {
+ volumeSplits.add(makePartitionSplit(tableName, path,
blockLocation, partitionKeys[i]));
+ }
+ blockLocations.addAll(Arrays.asList(blkLocations));
+
+ } else { // Non splittable
+ long blockSize = blkLocations[0].getLength();
+ if (blockSize >= length) {
+ blockLocations.addAll(Arrays.asList(blkLocations));
+ for (BlockLocation blockLocation : blkLocations) {
+ volumeSplits.add(makePartitionSplit(tableName, path,
blockLocation, partitionKeys[i]));
+ }
+ } else {
+ splits.add(makeNonPartitionSplit(tableName, path, 0,
length, blkLocations, partitionKeys[i]));
+ }
+ }
+
+ } else {
+ if (splittable) {
+
+ long minSize = Math.max(getMinSplitSize(), 1);
+
+ long blockSize = file.getBlockSize(); // s3n rest api
contained block size but blockLocations is one
+ long splitSize = Math.max(minSize, blockSize);
+ long bytesRemaining = length;
+
+ // for s3
+ while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) {
+ int blkIndex = getBlockIndex(blkLocations, length -
bytesRemaining);
+
+ splits.add(makePartitionSplit(tableName, path, length -
bytesRemaining, splitSize,
+ blkLocations[blkIndex].getHosts(), partitionKeys[i]));
+
+ bytesRemaining -= splitSize;
+ }
+ if (bytesRemaining > 0) {
+ int blkIndex = getBlockIndex(blkLocations, length -
bytesRemaining);
+ splits.add(makePartitionSplit(tableName, path, length -
bytesRemaining, bytesRemaining,
+ blkLocations[blkIndex].getHosts(), partitionKeys[i]));
+ }
+ } else { // Non splittable
+ splits.add(makeNonPartitionSplit(tableName, path, 0, length,
blkLocations, partitionKeys[i]));
+ }
+ }
+ }
+ }
+ if(LOG.isDebugEnabled()){
+ LOG.debug("# of splits per partition: " + (splits.size() -
previousSplitSize));
--- End diff --
The log message is somewhat different from the actual meaning.
``PreviousSplitSize`` is the number of files in partition directories, and
split is the number of total fragments. For example, there are two files in all
partitions, and each file makes 5 fragments. Then, the total number of
fragments is 10. In that case, 10 - 2 is jut 8. 8 is not the number of splits
per partition. The actual answer should be 5. In addition, we can say X per Y
when X is the same across all Ys. It is hard to get answer because each
partition file can be vary in terms of size. So, average is more proper here, I
think.
> Implement PartitionFileFragment
> -------------------------------
>
> Key: TAJO-1952
> URL: https://issues.apache.org/jira/browse/TAJO-1952
> Project: Tajo
> Issue Type: Improvement
> Components: Planner/Optimizer, Storage
> Reporter: Jaehwa Jung
> Assignee: Jaehwa Jung
> Fix For: 0.12.0, 0.11.1
>
> Attachments: TAJO-1952.patch
>
>
> Currently, PartitionedTableScanNode contains the list of partitions and it
> seems to me that the list has some problems as following:
> 1. Duplicate Informs: Task contains Fragment which specify target directory
> or target file for scanning. A path of partition lists already would write to
> Fragment.
> 2. Network Resource: When scanning lost of partition, it will occupy network
> resource, for example, several hundred kilobytes or more. It looks like an
> unnecessary resource because Fragment already has the path of partitions.
> I want to improve above problems by implementing new Fragment called
> PartitionedFileFragment. Currently, I'm planning the implementation as
> following:
> * PartitionedFileFragment will borrow FileFragment and it contains the
> partition path and the partition key values.
> * Remove the path array of partitions from PartitionedTableScanNode.
> * Implement a method for getting filtered partition directories in
> FileTableSpace.
> * Implement a method for making PartitionedFileFragment array.
> * Before making splits, call above method and use it for making splits.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)