Github user hyunsik commented on a diff in the pull request:
https://github.com/apache/tajo/pull/846#discussion_r46244839
--- Diff:
tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileTablespace.java
---
@@ -550,6 +551,181 @@ public long getMinSplitSize() {
return splits;
}
+
////////////////////////////////////////////////////////////////////////////////
+ // The below code is for splitting partitioned table.
+
////////////////////////////////////////////////////////////////////////////////
+
+ public PartitionFileFragment[] partitionSplit(String tableName, Path
tablePath, String partitionKeys)
+ throws IOException {
+ return partitionSplit(tableName, tablePath, fs.getDefaultBlockSize(),
partitionKeys);
+ }
+
+ private PartitionFileFragment[] partitionSplit(String tableName, Path
tablePath, long size, String partitionKeys)
+ throws IOException {
+ FileSystem fs = tablePath.getFileSystem(conf);
+
+ long defaultBlockSize = size;
+ List<PartitionFileFragment> listTablets = new ArrayList<>();
+ PartitionFileFragment tablet;
+
+ FileStatus[] fileLists = fs.listStatus(tablePath);
+ for (FileStatus file : fileLists) {
+ long remainFileSize = file.getLen();
+ long start = 0;
+ if (remainFileSize > defaultBlockSize) {
+ while (remainFileSize > defaultBlockSize) {
+ tablet = new PartitionFileFragment(tableName, file.getPath(),
start, defaultBlockSize, partitionKeys);
+ listTablets.add(tablet);
+ start += defaultBlockSize;
+ remainFileSize -= defaultBlockSize;
+ }
+ listTablets.add(new PartitionFileFragment(tableName,
file.getPath(), start, remainFileSize, partitionKeys));
+ } else {
+ listTablets.add(new PartitionFileFragment(tableName,
file.getPath(), 0, remainFileSize, partitionKeys));
+ }
+ }
+
+ PartitionFileFragment[] tablets = new
PartitionFileFragment[listTablets.size()];
+ listTablets.toArray(tablets);
+
+ return tablets;
+ }
+
+ protected PartitionFileFragment makePartitionSplit(String fragmentId,
Path file, long start, long length,
+ String[] hosts,
String partitionKeys) {
+ return new PartitionFileFragment(fragmentId, file, start, length,
hosts, partitionKeys);
+ }
+
+ protected PartitionFileFragment makePartitionSplit(String fragmentId,
Path file, BlockLocation blockLocation
+ , String partitionKeys) throws IOException {
+ return new PartitionFileFragment(fragmentId, file, blockLocation,
partitionKeys);
+ }
+
+ protected Fragment makeNonPartitionSplit(String fragmentId, Path file,
long start, long length,
+ BlockLocation[] blkLocations,
String partitionKeys) throws IOException {
+
+ Map<String, Integer> hostsBlockMap = new HashMap<>();
+ for (BlockLocation blockLocation : blkLocations) {
+ for (String host : blockLocation.getHosts()) {
+ if (hostsBlockMap.containsKey(host)) {
+ hostsBlockMap.put(host, hostsBlockMap.get(host) + 1);
+ } else {
+ hostsBlockMap.put(host, 1);
+ }
+ }
+ }
+
+ List<Map.Entry<String, Integer>> entries = new
ArrayList<>(hostsBlockMap.entrySet());
+ Collections.sort(entries, new Comparator<Map.Entry<String, Integer>>()
{
+
+ @Override
+ public int compare(Map.Entry<String, Integer> v1, Map.Entry<String,
Integer> v2) {
+ return v1.getValue().compareTo(v2.getValue());
+ }
+ });
+
+ String[] hosts = new String[blkLocations[0].getHosts().length];
+
+ for (int i = 0; i < hosts.length; i++) {
+ Map.Entry<String, Integer> entry = entries.get((entries.size() - 1)
- i);
+ hosts[i] = entry.getKey();
+ }
+
+ return new PartitionFileFragment(fragmentId, file, start, length,
hosts, partitionKeys);
+ }
+
+ /**
+ * Generate the list of files and make them into PartitionedFileSplits.
+ *
+ * @throws IOException
+ */
+ public List<Fragment> getPartitionSplits(String tableName, TableMeta
meta, Schema schema, String[] partitionKeys,
+ Path... inputs) throws
IOException {
+ // generate splits'
+
+ List<Fragment> splits = Lists.newArrayList();
+ List<Fragment> volumeSplits = Lists.newArrayList();
+ List<BlockLocation> blockLocations = Lists.newArrayList();
+
+ int i = 0;
+ for (Path p : inputs) {
+ ArrayList<FileStatus> files = Lists.newArrayList();
+ if (fs.isFile(p)) {
+ files.addAll(Lists.newArrayList(fs.getFileStatus(p)));
+ } else {
+ files.addAll(listStatus(p));
+ }
+
+ int previousSplitSize = splits.size();
--- End diff --
It would be better if the ```final``` marked for ``previousSplitSize``.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---