http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/DiskUtil.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/DiskUtil.java b/tajo-storage/src/main/java/org/apache/tajo/storage/DiskUtil.java deleted file mode 100644 index 2d68870..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/DiskUtil.java +++ /dev/null @@ -1,207 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hdfs.HdfsConfiguration; -import org.apache.hadoop.hdfs.server.common.Util; - -import java.io.*; -import java.net.URI; -import java.util.*; - -import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY; - -public class DiskUtil { - - static String UNIX_DISK_DEVICE_PATH = "/proc/partitions"; - - public enum OSType { - OS_TYPE_UNIX, OS_TYPE_WINXP, OS_TYPE_SOLARIS, OS_TYPE_MAC - } - - static private OSType getOSType() { - String osName = System.getProperty("os.name"); - if (osName.contains("Windows") - && (osName.contains("XP") || osName.contains("2003") - || osName.contains("Vista") - || osName.contains("Windows_7") - || osName.contains("Windows 7") || osName - .contains("Windows7"))) { - return OSType.OS_TYPE_WINXP; - } else if (osName.contains("SunOS") || osName.contains("Solaris")) { - return OSType.OS_TYPE_SOLARIS; - } else if (osName.contains("Mac")) { - return OSType.OS_TYPE_MAC; - } else { - return OSType.OS_TYPE_UNIX; - } - } - - public static List<DiskDeviceInfo> getDiskDeviceInfos() throws IOException { - List<DiskDeviceInfo> deviceInfos; - - if(getOSType() == OSType.OS_TYPE_UNIX) { - deviceInfos = getUnixDiskDeviceInfos(); - setDeviceMountInfo(deviceInfos); - } else { - deviceInfos = getDefaultDiskDeviceInfos(); - } - - return deviceInfos; - } - - private static List<DiskDeviceInfo> getUnixDiskDeviceInfos() { - List<DiskDeviceInfo> infos = new ArrayList<DiskDeviceInfo>(); - - File file = new File(UNIX_DISK_DEVICE_PATH); - if(!file.exists()) { - System.out.println("No partition file:" + file.getAbsolutePath()); - return getDefaultDiskDeviceInfos(); - } - - BufferedReader reader = null; - try { - reader = new BufferedReader(new InputStreamReader(new FileInputStream(UNIX_DISK_DEVICE_PATH))); - String line = null; - - int count = 0; - Set<String> deviceNames = new TreeSet<String>(); - while((line = reader.readLine()) != null) { - if(count > 0 && !line.trim().isEmpty()) { - String[] tokens = line.trim().split(" +"); - if(tokens.length == 4) { - String deviceName = getDiskDeviceName(tokens[3]); - deviceNames.add(deviceName); - } - } - count++; - } - - int id = 0; - for(String eachDeviceName: deviceNames) { - DiskDeviceInfo diskDeviceInfo = new DiskDeviceInfo(id++); - diskDeviceInfo.setName(eachDeviceName); - - //TODO set addtional info - // /sys/block/sda/queue - infos.add(diskDeviceInfo); - } - } catch (Exception e) { - e.printStackTrace(); - } finally { - if(reader != null) { - try { - reader.close(); - } catch (IOException e) { - } - } - } - - return infos; - } - - private static String getDiskDeviceName(String partitionName) { - byte[] bytes = partitionName.getBytes(); - - byte[] result = new byte[bytes.length]; - int length = 0; - for(int i = 0; i < bytes.length; i++, length++) { - if(bytes[i] >= '0' && bytes[i] <= '9') { - break; - } else { - result[i] = bytes[i]; - } - } - - return new String(result, 0, length); - } - - public static List<DiskDeviceInfo> getDefaultDiskDeviceInfos() { - DiskDeviceInfo diskDeviceInfo = new DiskDeviceInfo(0); - diskDeviceInfo.setName("default"); - - List<DiskDeviceInfo> infos = new ArrayList<DiskDeviceInfo>(); - - infos.add(diskDeviceInfo); - - return infos; - } - - - private static void setDeviceMountInfo(List<DiskDeviceInfo> deviceInfos) throws IOException { - Map<String, DiskDeviceInfo> deviceMap = new HashMap<String, DiskDeviceInfo>(); - for(DiskDeviceInfo eachDevice: deviceInfos) { - deviceMap.put(eachDevice.getName(), eachDevice); - } - - BufferedReader mountOutput = null; - try { - Process mountProcess = Runtime.getRuntime().exec("mount"); - mountOutput = new BufferedReader(new InputStreamReader( - mountProcess.getInputStream())); - while (true) { - String line = mountOutput.readLine(); - if (line == null) { - break; - } - - int indexStart = line.indexOf(" on /"); - int indexEnd = line.indexOf(" ", indexStart + 4); - - String deviceName = line.substring(0, indexStart).trim(); - String[] deviceNameTokens = deviceName.split("/"); - if(deviceNameTokens.length == 3) { - if("dev".equals(deviceNameTokens[1])) { - String realDeviceName = getDiskDeviceName(deviceNameTokens[2]); - String mountPath = new File(line.substring(indexStart + 4, indexEnd)).getAbsolutePath(); - - DiskDeviceInfo diskDeviceInfo = deviceMap.get(realDeviceName); - if(diskDeviceInfo != null) { - diskDeviceInfo.addMountPath(new DiskMountInfo(diskDeviceInfo.getId(), mountPath)); - } - } - } - } - } catch (IOException e) { - throw e; - } finally { - if (mountOutput != null) { - mountOutput.close(); - } - } - } - - public static int getDataNodeStorageSize(){ - return getStorageDirs().size(); - } - - public static List<URI> getStorageDirs(){ - Configuration conf = new HdfsConfiguration(); - Collection<String> dirNames = conf.getTrimmedStringCollection(DFS_DATANODE_DATA_DIR_KEY); - return Util.stringCollectionAsURIs(dirNames); - } - - public static void main(String[] args) throws Exception { - System.out.println("/dev/sde1".split("/").length); - for(String eachToken: "/dev/sde1".split("/")) { - System.out.println(eachToken); - } - } -}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/FieldSerializerDeserializer.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/FieldSerializerDeserializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/FieldSerializerDeserializer.java deleted file mode 100644 index 7df4584..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/FieldSerializerDeserializer.java +++ /dev/null @@ -1,35 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -import io.netty.buffer.ByteBuf; -import org.apache.tajo.catalog.Column; -import org.apache.tajo.datum.Datum; - -import java.io.IOException; -import java.io.OutputStream; - - -public interface FieldSerializerDeserializer { - - public int serialize(OutputStream out, Datum datum, Column col, int columnIndex, byte[] nullChars) throws IOException; - - public Datum deserialize(ByteBuf buf, Column col, int columnIndex, ByteBuf nullChars) throws IOException; - -} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/FileAppender.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/FileAppender.java b/tajo-storage/src/main/java/org/apache/tajo/storage/FileAppender.java deleted file mode 100644 index 117d3da..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/FileAppender.java +++ /dev/null @@ -1,85 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.tajo.QueryUnitAttemptId; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.conf.TajoConf; - -import java.io.IOException; - -public abstract class FileAppender implements Appender { - private static final Log LOG = LogFactory.getLog(FileAppender.class); - - protected boolean inited = false; - - protected final Configuration conf; - protected final TableMeta meta; - protected final Schema schema; - protected final Path workDir; - protected final QueryUnitAttemptId taskAttemptId; - - protected boolean enabledStats; - protected Path path; - - public FileAppender(Configuration conf, QueryUnitAttemptId taskAttemptId, Schema schema, - TableMeta meta, Path workDir) { - this.conf = conf; - this.meta = meta; - this.schema = schema; - this.workDir = workDir; - this.taskAttemptId = taskAttemptId; - - try { - if (taskAttemptId != null) { - this.path = StorageManager.getFileStorageManager((TajoConf) conf).getAppenderFilePath(taskAttemptId, workDir); - } else { - this.path = workDir; - } - } catch (IOException e) { - LOG.error(e.getMessage(), e); - } - } - - public void init() throws IOException { - if (inited) { - throw new IllegalStateException("FileAppender is already initialized."); - } - inited = true; - } - - public void enableStats() { - if (inited) { - throw new IllegalStateException("Should enable this option before init()"); - } - - this.enabledStats = true; - } - - public long getEstimatedOutputSize() throws IOException { - return getOffset(); - } - - public abstract long getOffset() throws IOException; -} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/FileScanner.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/FileScanner.java b/tajo-storage/src/main/java/org/apache/tajo/storage/FileScanner.java deleted file mode 100644 index 038f0f4..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/FileScanner.java +++ /dev/null @@ -1,124 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.tajo.catalog.Column; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.catalog.statistics.ColumnStats; -import org.apache.tajo.catalog.statistics.TableStats; -import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.storage.fragment.FileFragment; -import org.apache.tajo.storage.fragment.Fragment; - -import java.io.IOException; - -public abstract class FileScanner implements Scanner { - private static final Log LOG = LogFactory.getLog(FileScanner.class); - - protected boolean inited = false; - protected final Configuration conf; - protected final TableMeta meta; - protected final Schema schema; - protected final FileFragment fragment; - protected final int columnNum; - - protected Column [] targets; - - protected float progress; - - protected TableStats tableStats; - - public FileScanner(Configuration conf, final Schema schema, final TableMeta meta, final Fragment fragment) { - this.conf = conf; - this.meta = meta; - this.schema = schema; - this.fragment = (FileFragment)fragment; - this.tableStats = new TableStats(); - this.columnNum = this.schema.size(); - } - - public void init() throws IOException { - inited = true; - progress = 0.0f; - - if (fragment != null) { - tableStats.setNumBytes(fragment.getLength()); - tableStats.setNumBlocks(1); - } - - if (schema != null) { - for(Column eachColumn: schema.getColumns()) { - ColumnStats columnStats = new ColumnStats(eachColumn); - tableStats.addColumnStat(columnStats); - } - } - } - - @Override - public Schema getSchema() { - return schema; - } - - @Override - public void setTarget(Column[] targets) { - if (inited) { - throw new IllegalStateException("Should be called before init()"); - } - this.targets = targets; - } - - public void setSearchCondition(Object expr) { - if (inited) { - throw new IllegalStateException("Should be called before init()"); - } - } - - public static FileSystem getFileSystem(TajoConf tajoConf, Path path) throws IOException { - String tajoUser = tajoConf.getVar(TajoConf.ConfVars.USERNAME); - FileSystem fs; - if(tajoUser != null) { - try { - fs = FileSystem.get(path.toUri(), tajoConf, tajoUser); - } catch (InterruptedException e) { - LOG.warn("Occur InterruptedException while FileSystem initiating with user[" + tajoUser + "]"); - fs = FileSystem.get(path.toUri(), tajoConf); - } - } else { - fs = FileSystem.get(path.toUri(), tajoConf); - } - - return fs; - } - - @Override - public float getProgress() { - return progress; - } - - @Override - public TableStats getInputStats() { - return tableStats; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/FileStorageManager.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/FileStorageManager.java b/tajo-storage/src/main/java/org/apache/tajo/storage/FileStorageManager.java deleted file mode 100644 index 442ed5e..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/FileStorageManager.java +++ /dev/null @@ -1,832 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.*; -import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.DistributedFileSystem; -import org.apache.tajo.*; -import org.apache.tajo.catalog.*; -import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; -import org.apache.tajo.catalog.statistics.TableStats; -import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.plan.logical.LogicalNode; -import org.apache.tajo.plan.logical.ScanNode; -import org.apache.tajo.storage.fragment.FileFragment; -import org.apache.tajo.storage.fragment.Fragment; -import org.apache.tajo.util.*; - -import java.io.IOException; -import java.text.NumberFormat; -import java.util.*; -import java.util.concurrent.atomic.AtomicInteger; - -public class FileStorageManager extends StorageManager { - private final Log LOG = LogFactory.getLog(FileStorageManager.class); - - static final String OUTPUT_FILE_PREFIX="part-"; - static final ThreadLocal<NumberFormat> OUTPUT_FILE_FORMAT_SUBQUERY = - new ThreadLocal<NumberFormat>() { - @Override - public NumberFormat initialValue() { - NumberFormat fmt = NumberFormat.getInstance(); - fmt.setGroupingUsed(false); - fmt.setMinimumIntegerDigits(2); - return fmt; - } - }; - static final ThreadLocal<NumberFormat> OUTPUT_FILE_FORMAT_TASK = - new ThreadLocal<NumberFormat>() { - @Override - public NumberFormat initialValue() { - NumberFormat fmt = NumberFormat.getInstance(); - fmt.setGroupingUsed(false); - fmt.setMinimumIntegerDigits(6); - return fmt; - } - }; - - static final ThreadLocal<NumberFormat> OUTPUT_FILE_FORMAT_SEQ = - new ThreadLocal<NumberFormat>() { - @Override - public NumberFormat initialValue() { - NumberFormat fmt = NumberFormat.getInstance(); - fmt.setGroupingUsed(false); - fmt.setMinimumIntegerDigits(3); - return fmt; - } - }; - - protected FileSystem fs; - protected Path tableBaseDir; - protected boolean blocksMetadataEnabled; - private static final HdfsVolumeId zeroVolumeId = new HdfsVolumeId(Bytes.toBytes(0)); - - public FileStorageManager(StoreType storeType) { - super(storeType); - } - - @Override - protected void storageInit() throws IOException { - this.tableBaseDir = TajoConf.getWarehouseDir(conf); - this.fs = tableBaseDir.getFileSystem(conf); - this.blocksMetadataEnabled = conf.getBoolean(DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, - DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT); - if (!this.blocksMetadataEnabled) - LOG.warn("does not support block metadata. ('dfs.datanode.hdfs-blocks-metadata.enabled')"); - } - - public Scanner getFileScanner(TableMeta meta, Schema schema, Path path) - throws IOException { - FileSystem fs = path.getFileSystem(conf); - FileStatus status = fs.getFileStatus(path); - return getFileScanner(meta, schema, path, status); - } - - public Scanner getFileScanner(TableMeta meta, Schema schema, Path path, FileStatus status) - throws IOException { - Fragment fragment = new FileFragment(path.getName(), path, 0, status.getLen()); - return getScanner(meta, schema, fragment); - } - - public FileSystem getFileSystem() { - return this.fs; - } - - public Path getWarehouseDir() { - return this.tableBaseDir; - } - - public void delete(Path tablePath) throws IOException { - FileSystem fs = tablePath.getFileSystem(conf); - fs.delete(tablePath, true); - } - - public boolean exists(Path path) throws IOException { - FileSystem fileSystem = path.getFileSystem(conf); - return fileSystem.exists(path); - } - - /** - * This method deletes only data contained in the given path. - * - * @param path The path in which data are deleted. - * @throws IOException - */ - public void deleteData(Path path) throws IOException { - FileSystem fileSystem = path.getFileSystem(conf); - FileStatus[] fileLists = fileSystem.listStatus(path); - for (FileStatus status : fileLists) { - fileSystem.delete(status.getPath(), true); - } - } - - public Path getTablePath(String tableName) { - return new Path(tableBaseDir, tableName); - } - - @VisibleForTesting - public Appender getAppender(TableMeta meta, Schema schema, Path filePath) - throws IOException { - return getAppender(null, null, meta, schema, filePath); - } - - public FileFragment[] split(String tableName) throws IOException { - Path tablePath = new Path(tableBaseDir, tableName); - return split(tableName, tablePath, fs.getDefaultBlockSize()); - } - - public FileFragment[] split(String tableName, long fragmentSize) throws IOException { - Path tablePath = new Path(tableBaseDir, tableName); - return split(tableName, tablePath, fragmentSize); - } - - public FileFragment[] splitBroadcastTable(Path tablePath) throws IOException { - FileSystem fs = tablePath.getFileSystem(conf); - List<FileFragment> listTablets = new ArrayList<FileFragment>(); - FileFragment tablet; - - FileStatus[] fileLists = fs.listStatus(tablePath); - for (FileStatus file : fileLists) { - tablet = new FileFragment(tablePath.getName(), file.getPath(), 0, file.getLen()); - listTablets.add(tablet); - } - - FileFragment[] tablets = new FileFragment[listTablets.size()]; - listTablets.toArray(tablets); - - return tablets; - } - - public FileFragment[] split(Path tablePath) throws IOException { - FileSystem fs = tablePath.getFileSystem(conf); - return split(tablePath.getName(), tablePath, fs.getDefaultBlockSize()); - } - - public FileFragment[] split(String tableName, Path tablePath) throws IOException { - return split(tableName, tablePath, fs.getDefaultBlockSize()); - } - - private FileFragment[] split(String tableName, Path tablePath, long size) - throws IOException { - FileSystem fs = tablePath.getFileSystem(conf); - - long defaultBlockSize = size; - List<FileFragment> listTablets = new ArrayList<FileFragment>(); - FileFragment tablet; - - FileStatus[] fileLists = fs.listStatus(tablePath); - for (FileStatus file : fileLists) { - long remainFileSize = file.getLen(); - long start = 0; - if (remainFileSize > defaultBlockSize) { - while (remainFileSize > defaultBlockSize) { - tablet = new FileFragment(tableName, file.getPath(), start, defaultBlockSize); - listTablets.add(tablet); - start += defaultBlockSize; - remainFileSize -= defaultBlockSize; - } - listTablets.add(new FileFragment(tableName, file.getPath(), start, remainFileSize)); - } else { - listTablets.add(new FileFragment(tableName, file.getPath(), 0, remainFileSize)); - } - } - - FileFragment[] tablets = new FileFragment[listTablets.size()]; - listTablets.toArray(tablets); - - return tablets; - } - - public static FileFragment[] splitNG(Configuration conf, String tableName, TableMeta meta, - Path tablePath, long size) - throws IOException { - FileSystem fs = tablePath.getFileSystem(conf); - - long defaultBlockSize = size; - List<FileFragment> listTablets = new ArrayList<FileFragment>(); - FileFragment tablet; - - FileStatus[] fileLists = fs.listStatus(tablePath); - for (FileStatus file : fileLists) { - long remainFileSize = file.getLen(); - long start = 0; - if (remainFileSize > defaultBlockSize) { - while (remainFileSize > defaultBlockSize) { - tablet = new FileFragment(tableName, file.getPath(), start, defaultBlockSize); - listTablets.add(tablet); - start += defaultBlockSize; - remainFileSize -= defaultBlockSize; - } - listTablets.add(new FileFragment(tableName, file.getPath(), start, remainFileSize)); - } else { - listTablets.add(new FileFragment(tableName, file.getPath(), 0, remainFileSize)); - } - } - - FileFragment[] tablets = new FileFragment[listTablets.size()]; - listTablets.toArray(tablets); - - return tablets; - } - - public long calculateSize(Path tablePath) throws IOException { - FileSystem fs = tablePath.getFileSystem(conf); - long totalSize = 0; - - if (fs.exists(tablePath)) { - totalSize = fs.getContentSummary(tablePath).getLength(); - } - - return totalSize; - } - - ///////////////////////////////////////////////////////////////////////////// - // FileInputFormat Area - ///////////////////////////////////////////////////////////////////////////// - - public static final PathFilter hiddenFileFilter = new PathFilter() { - public boolean accept(Path p) { - String name = p.getName(); - return !name.startsWith("_") && !name.startsWith("."); - } - }; - - public Path getAppenderFilePath(QueryUnitAttemptId taskAttemptId, Path workDir) { - if (taskAttemptId == null) { - // For testcase - return workDir; - } - // The final result of a task will be written in a file named part-ss-nnnnnnn, - // where ss is the subquery id associated with this task, and nnnnnn is the task id. - Path outFilePath = StorageUtil.concatPath(workDir, TajoConstants.RESULT_DIR_NAME, - OUTPUT_FILE_PREFIX + - OUTPUT_FILE_FORMAT_SUBQUERY.get().format(taskAttemptId.getQueryUnitId().getExecutionBlockId().getId()) + "-" + - OUTPUT_FILE_FORMAT_TASK.get().format(taskAttemptId.getQueryUnitId().getId()) + "-" + - OUTPUT_FILE_FORMAT_SEQ.get().format(0)); - LOG.info("Output File Path: " + outFilePath); - - return outFilePath; - } - - /** - * Proxy PathFilter that accepts a path only if all filters given in the - * constructor do. Used by the listPaths() to apply the built-in - * hiddenFileFilter together with a user provided one (if any). - */ - private static class MultiPathFilter implements PathFilter { - private List<PathFilter> filters; - - public MultiPathFilter(List<PathFilter> filters) { - this.filters = filters; - } - - public boolean accept(Path path) { - for (PathFilter filter : filters) { - if (!filter.accept(path)) { - return false; - } - } - return true; - } - } - - /** - * List input directories. - * Subclasses may override to, e.g., select only files matching a regular - * expression. - * - * @return array of FileStatus objects - * @throws IOException if zero items. - */ - protected List<FileStatus> listStatus(Path... dirs) throws IOException { - List<FileStatus> result = new ArrayList<FileStatus>(); - if (dirs.length == 0) { - throw new IOException("No input paths specified in job"); - } - - List<IOException> errors = new ArrayList<IOException>(); - - // creates a MultiPathFilter with the hiddenFileFilter and the - // user provided one (if any). - List<PathFilter> filters = new ArrayList<PathFilter>(); - filters.add(hiddenFileFilter); - - PathFilter inputFilter = new MultiPathFilter(filters); - - for (int i = 0; i < dirs.length; ++i) { - Path p = dirs[i]; - - FileSystem fs = p.getFileSystem(conf); - FileStatus[] matches = fs.globStatus(p, inputFilter); - if (matches == null) { - errors.add(new IOException("Input path does not exist: " + p)); - } else if (matches.length == 0) { - errors.add(new IOException("Input Pattern " + p + " matches 0 files")); - } else { - for (FileStatus globStat : matches) { - if (globStat.isDirectory()) { - for (FileStatus stat : fs.listStatus(globStat.getPath(), - inputFilter)) { - result.add(stat); - } - } else { - result.add(globStat); - } - } - } - } - - if (!errors.isEmpty()) { - throw new InvalidInputException(errors); - } - LOG.info("Total input paths to process : " + result.size()); - return result; - } - - /** - * Is the given filename splitable? Usually, true, but if the file is - * stream compressed, it will not be. - * <p/> - * <code>FileInputFormat</code> implementations can override this and return - * <code>false</code> to ensure that individual input files are never split-up - * so that Mappers process entire files. - * - * - * @param path the file name to check - * @param status get the file length - * @return is this file isSplittable? - */ - protected boolean isSplittable(TableMeta meta, Schema schema, Path path, FileStatus status) throws IOException { - Scanner scanner = getFileScanner(meta, schema, path, status); - boolean split = scanner.isSplittable(); - scanner.close(); - return split; - } - - private static final double SPLIT_SLOP = 1.1; // 10% slop - - protected int getBlockIndex(BlockLocation[] blkLocations, - long offset) { - for (int i = 0; i < blkLocations.length; i++) { - // is the offset inside this block? - if ((blkLocations[i].getOffset() <= offset) && - (offset < blkLocations[i].getOffset() + blkLocations[i].getLength())) { - return i; - } - } - BlockLocation last = blkLocations[blkLocations.length - 1]; - long fileLength = last.getOffset() + last.getLength() - 1; - throw new IllegalArgumentException("Offset " + offset + - " is outside of file (0.." + - fileLength + ")"); - } - - /** - * A factory that makes the split for this class. It can be overridden - * by sub-classes to make sub-types - */ - protected FileFragment makeSplit(String fragmentId, Path file, long start, long length) { - return new FileFragment(fragmentId, file, start, length); - } - - protected FileFragment makeSplit(String fragmentId, Path file, long start, long length, - String[] hosts) { - return new FileFragment(fragmentId, file, start, length, hosts); - } - - protected FileFragment makeSplit(String fragmentId, Path file, BlockLocation blockLocation) - throws IOException { - return new FileFragment(fragmentId, file, blockLocation); - } - - // for Non Splittable. eg, compressed gzip TextFile - protected FileFragment makeNonSplit(String fragmentId, Path file, long start, long length, - BlockLocation[] blkLocations) throws IOException { - - Map<String, Integer> hostsBlockMap = new HashMap<String, Integer>(); - for (BlockLocation blockLocation : blkLocations) { - for (String host : blockLocation.getHosts()) { - if (hostsBlockMap.containsKey(host)) { - hostsBlockMap.put(host, hostsBlockMap.get(host) + 1); - } else { - hostsBlockMap.put(host, 1); - } - } - } - - List<Map.Entry<String, Integer>> entries = new ArrayList<Map.Entry<String, Integer>>(hostsBlockMap.entrySet()); - Collections.sort(entries, new Comparator<Map.Entry<String, Integer>>() { - - @Override - public int compare(Map.Entry<String, Integer> v1, Map.Entry<String, Integer> v2) { - return v1.getValue().compareTo(v2.getValue()); - } - }); - - String[] hosts = new String[blkLocations[0].getHosts().length]; - - for (int i = 0; i < hosts.length; i++) { - Map.Entry<String, Integer> entry = entries.get((entries.size() - 1) - i); - hosts[i] = entry.getKey(); - } - return new FileFragment(fragmentId, file, start, length, hosts); - } - - /** - * Get the minimum split size - * - * @return the minimum number of bytes that can be in a split - */ - public long getMinSplitSize() { - return conf.getLongVar(TajoConf.ConfVars.MINIMUM_SPLIT_SIZE); - } - - /** - * Get Disk Ids by Volume Bytes - */ - private int[] getDiskIds(VolumeId[] volumeIds) { - int[] diskIds = new int[volumeIds.length]; - for (int i = 0; i < volumeIds.length; i++) { - int diskId = -1; - if (volumeIds[i] != null && volumeIds[i].hashCode() > 0) { - diskId = volumeIds[i].hashCode() - zeroVolumeId.hashCode(); - } - diskIds[i] = diskId; - } - return diskIds; - } - - /** - * Generate the map of host and make them into Volume Ids. - * - */ - private Map<String, Set<Integer>> getVolumeMap(List<FileFragment> frags) { - Map<String, Set<Integer>> volumeMap = new HashMap<String, Set<Integer>>(); - for (FileFragment frag : frags) { - String[] hosts = frag.getHosts(); - int[] diskIds = frag.getDiskIds(); - for (int i = 0; i < hosts.length; i++) { - Set<Integer> volumeList = volumeMap.get(hosts[i]); - if (volumeList == null) { - volumeList = new HashSet<Integer>(); - volumeMap.put(hosts[i], volumeList); - } - - if (diskIds.length > 0 && diskIds[i] > -1) { - volumeList.add(diskIds[i]); - } - } - } - - return volumeMap; - } - /** - * Generate the list of files and make them into FileSplits. - * - * @throws IOException - */ - public List<Fragment> getSplits(String tableName, TableMeta meta, Schema schema, Path... inputs) - throws IOException { - // generate splits' - - List<Fragment> splits = Lists.newArrayList(); - List<Fragment> volumeSplits = Lists.newArrayList(); - List<BlockLocation> blockLocations = Lists.newArrayList(); - - for (Path p : inputs) { - FileSystem fs = p.getFileSystem(conf); - ArrayList<FileStatus> files = Lists.newArrayList(); - if (fs.isFile(p)) { - files.addAll(Lists.newArrayList(fs.getFileStatus(p))); - } else { - files.addAll(listStatus(p)); - } - - int previousSplitSize = splits.size(); - for (FileStatus file : files) { - Path path = file.getPath(); - long length = file.getLen(); - if (length > 0) { - // Get locations of blocks of file - BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length); - boolean splittable = isSplittable(meta, schema, path, file); - if (blocksMetadataEnabled && fs instanceof DistributedFileSystem) { - - if (splittable) { - for (BlockLocation blockLocation : blkLocations) { - volumeSplits.add(makeSplit(tableName, path, blockLocation)); - } - blockLocations.addAll(Arrays.asList(blkLocations)); - - } else { // Non splittable - long blockSize = blkLocations[0].getLength(); - if (blockSize >= length) { - blockLocations.addAll(Arrays.asList(blkLocations)); - for (BlockLocation blockLocation : blkLocations) { - volumeSplits.add(makeSplit(tableName, path, blockLocation)); - } - } else { - splits.add(makeNonSplit(tableName, path, 0, length, blkLocations)); - } - } - - } else { - if (splittable) { - - long minSize = Math.max(getMinSplitSize(), 1); - - long blockSize = file.getBlockSize(); // s3n rest api contained block size but blockLocations is one - long splitSize = Math.max(minSize, blockSize); - long bytesRemaining = length; - - // for s3 - while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) { - int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining); - splits.add(makeSplit(tableName, path, length - bytesRemaining, splitSize, - blkLocations[blkIndex].getHosts())); - bytesRemaining -= splitSize; - } - if (bytesRemaining > 0) { - int blkIndex = getBlockIndex(blkLocations, length - bytesRemaining); - splits.add(makeSplit(tableName, path, length - bytesRemaining, bytesRemaining, - blkLocations[blkIndex].getHosts())); - } - } else { // Non splittable - splits.add(makeNonSplit(tableName, path, 0, length, blkLocations)); - } - } - } else { - //for zero length files - splits.add(makeSplit(tableName, path, 0, length)); - } - } - if(LOG.isDebugEnabled()){ - LOG.debug("# of splits per partition: " + (splits.size() - previousSplitSize)); - } - } - - // Combine original fileFragments with new VolumeId information - setVolumeMeta(volumeSplits, blockLocations); - splits.addAll(volumeSplits); - LOG.info("Total # of splits: " + splits.size()); - return splits; - } - - private void setVolumeMeta(List<Fragment> splits, final List<BlockLocation> blockLocations) - throws IOException { - - int locationSize = blockLocations.size(); - int splitSize = splits.size(); - if (locationSize == 0 || splitSize == 0) return; - - if (locationSize != splitSize) { - // splits and locations don't match up - LOG.warn("Number of block locations not equal to number of splits: " - + "#locations=" + locationSize - + " #splits=" + splitSize); - return; - } - - DistributedFileSystem fs = (DistributedFileSystem)DistributedFileSystem.get(conf); - int lsLimit = conf.getInt(DFSConfigKeys.DFS_LIST_LIMIT, DFSConfigKeys.DFS_LIST_LIMIT_DEFAULT); - int blockLocationIdx = 0; - - Iterator<Fragment> iter = splits.iterator(); - while (locationSize > blockLocationIdx) { - - int subSize = Math.min(locationSize - blockLocationIdx, lsLimit); - List<BlockLocation> locations = blockLocations.subList(blockLocationIdx, blockLocationIdx + subSize); - //BlockStorageLocation containing additional volume location information for each replica of each block. - BlockStorageLocation[] blockStorageLocations = fs.getFileBlockStorageLocations(locations); - - for (BlockStorageLocation blockStorageLocation : blockStorageLocations) { - ((FileFragment)iter.next()).setDiskIds(getDiskIds(blockStorageLocation.getVolumeIds())); - blockLocationIdx++; - } - } - LOG.info("# of splits with volumeId " + splitSize); - } - - private static class InvalidInputException extends IOException { - List<IOException> errors; - public InvalidInputException(List<IOException> errors) { - this.errors = errors; - } - - @Override - public String getMessage(){ - StringBuffer sb = new StringBuffer(); - int messageLimit = Math.min(errors.size(), 10); - for (int i = 0; i < messageLimit ; i ++) { - sb.append(errors.get(i).getMessage()).append("\n"); - } - - if(messageLimit < errors.size()) - sb.append("skipped .....").append("\n"); - - return sb.toString(); - } - } - - @Override - public List<Fragment> getSplits(String tableName, TableDesc table, ScanNode scanNode) throws IOException { - return getSplits(tableName, table.getMeta(), table.getSchema(), new Path(table.getPath())); - } - - @Override - public void createTable(TableDesc tableDesc, boolean ifNotExists) throws IOException { - if (!tableDesc.isExternal()) { - String [] splitted = CatalogUtil.splitFQTableName(tableDesc.getName()); - String databaseName = splitted[0]; - String simpleTableName = splitted[1]; - - // create a table directory (i.e., ${WAREHOUSE_DIR}/${DATABASE_NAME}/${TABLE_NAME} ) - Path tablePath = StorageUtil.concatPath(tableBaseDir, databaseName, simpleTableName); - tableDesc.setPath(tablePath.toUri()); - } else { - Preconditions.checkState(tableDesc.getPath() != null, "ERROR: LOCATION must be given."); - } - - Path path = new Path(tableDesc.getPath()); - - FileSystem fs = path.getFileSystem(conf); - TableStats stats = new TableStats(); - if (tableDesc.isExternal()) { - if (!fs.exists(path)) { - LOG.error(path.toUri() + " does not exist"); - throw new IOException("ERROR: " + path.toUri() + " does not exist"); - } - } else { - fs.mkdirs(path); - } - - long totalSize = 0; - - try { - totalSize = calculateSize(path); - } catch (IOException e) { - LOG.warn("Cannot calculate the size of the relation", e); - } - - stats.setNumBytes(totalSize); - - if (tableDesc.isExternal()) { // if it is an external table, there is no way to know the exact row number without processing. - stats.setNumRows(TajoConstants.UNKNOWN_ROW_NUMBER); - } - - tableDesc.setStats(stats); - } - - @Override - public void purgeTable(TableDesc tableDesc) throws IOException { - try { - Path path = new Path(tableDesc.getPath()); - FileSystem fs = path.getFileSystem(conf); - LOG.info("Delete table data dir: " + path); - fs.delete(path, true); - } catch (IOException e) { - throw new InternalError(e.getMessage()); - } - } - - @Override - public List<Fragment> getNonForwardSplit(TableDesc tableDesc, int currentPage, int numFragments) throws IOException { - // Listing table data file which is not empty. - // If the table is a partitioned table, return file list which has same partition key. - Path tablePath = new Path(tableDesc.getPath()); - FileSystem fs = tablePath.getFileSystem(conf); - - List<FileStatus> nonZeroLengthFiles = new ArrayList<FileStatus>(); - if (fs.exists(tablePath)) { - getNonZeroLengthDataFiles(fs, tablePath, nonZeroLengthFiles, currentPage, numFragments, - new AtomicInteger(0)); - } - - List<Fragment> fragments = new ArrayList<Fragment>(); - - //In the case of partitioned table, return same partition key data files. - int numPartitionColumns = 0; - if (tableDesc.hasPartition()) { - numPartitionColumns = tableDesc.getPartitionMethod().getExpressionSchema().getColumns().size(); - } - String[] previousPartitionPathNames = null; - for (FileStatus eachFile: nonZeroLengthFiles) { - FileFragment fileFragment = new FileFragment(tableDesc.getName(), eachFile.getPath(), 0, eachFile.getLen(), null); - - if (numPartitionColumns > 0) { - // finding partition key; - Path filePath = fileFragment.getPath(); - Path parentPath = filePath; - String[] parentPathNames = new String[numPartitionColumns]; - for (int i = 0; i < numPartitionColumns; i++) { - parentPath = parentPath.getParent(); - parentPathNames[numPartitionColumns - i - 1] = parentPath.getName(); - } - - // If current partitionKey == previousPartitionKey, add to result. - if (previousPartitionPathNames == null) { - fragments.add(fileFragment); - } else if (previousPartitionPathNames != null && Arrays.equals(previousPartitionPathNames, parentPathNames)) { - fragments.add(fileFragment); - } else { - break; - } - previousPartitionPathNames = parentPathNames; - } else { - fragments.add(fileFragment); - } - } - - return fragments; - } - - private void getNonZeroLengthDataFiles(FileSystem fs, Path path, List<FileStatus> result, - int startFileIndex, int numResultFiles, - AtomicInteger currentFileIndex) throws IOException { - if (fs.isDirectory(path)) { - FileStatus[] files = fs.listStatus(path, FileStorageManager.hiddenFileFilter); - if (files != null && files.length > 0) { - for (FileStatus eachFile : files) { - if (result.size() >= numResultFiles) { - return; - } - if (eachFile.isDirectory()) { - getNonZeroLengthDataFiles(fs, eachFile.getPath(), result, startFileIndex, numResultFiles, - currentFileIndex); - } else if (eachFile.isFile() && eachFile.getLen() > 0) { - if (currentFileIndex.get() >= startFileIndex) { - result.add(eachFile); - } - currentFileIndex.incrementAndGet(); - } - } - } - } else { - FileStatus fileStatus = fs.getFileStatus(path); - if (fileStatus != null && fileStatus.getLen() > 0) { - if (currentFileIndex.get() >= startFileIndex) { - result.add(fileStatus); - } - currentFileIndex.incrementAndGet(); - if (result.size() >= numResultFiles) { - return; - } - } - } - } - - @Override - public StorageProperty getStorageProperty() { - StorageProperty storageProperty = new StorageProperty(); - storageProperty.setSortedInsert(false); - if (storeType == StoreType.RAW) { - storageProperty.setSupportsInsertInto(false); - } else { - storageProperty.setSupportsInsertInto(true); - } - - return storageProperty; - } - - @Override - public void closeStorageManager() { - } - - @Override - public void beforeInsertOrCATS(LogicalNode node) throws IOException { - } - - @Override - public void rollbackOutputCommit(LogicalNode node) throws IOException { - } - - @Override - public TupleRange[] getInsertSortRanges(OverridableConf queryContext, TableDesc tableDesc, - Schema inputSchema, SortSpec[] sortSpecs, TupleRange dataRange) - throws IOException { - return null; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/FrameTuple.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/FrameTuple.java b/tajo-storage/src/main/java/org/apache/tajo/storage/FrameTuple.java deleted file mode 100644 index 8b7e2e0..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/FrameTuple.java +++ /dev/null @@ -1,225 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * - */ -package org.apache.tajo.storage; - -import com.google.common.base.Preconditions; -import org.apache.tajo.datum.Datum; -import org.apache.tajo.datum.IntervalDatum; -import org.apache.tajo.datum.ProtobufDatum; -import org.apache.tajo.exception.UnsupportedException; - -/** - * An instance of FrameTuple is an immutable tuple. - * It contains two tuples and pretends to be one instance of Tuple for - * join qual evaluatations. - */ -public class FrameTuple implements Tuple, Cloneable { - private int size; - private int leftSize; - - private Tuple left; - private Tuple right; - - public FrameTuple() {} - - public FrameTuple(Tuple left, Tuple right) { - set(left, right); - } - - public void set(Tuple left, Tuple right) { - this.size = left.size() + right.size(); - this.left = left; - this.leftSize = left.size(); - this.right = right; - } - - @Override - public int size() { - return size; - } - - @Override - public boolean contains(int fieldId) { - Preconditions.checkArgument(fieldId < size, - "Out of field access: " + fieldId); - - if (fieldId < leftSize) { - return left.contains(fieldId); - } else { - return right.contains(fieldId - leftSize); - } - } - - @Override - public boolean isNull(int fieldid) { - return get(fieldid).isNull(); - } - - @Override - public boolean isNotNull(int fieldid) { - return !isNull(fieldid); - } - - @Override - public void clear() { - throw new UnsupportedException(); - } - - @Override - public void put(int fieldId, Datum value) { - throw new UnsupportedException(); - } - - @Override - public void put(int fieldId, Datum[] values) { - throw new UnsupportedException(); - } - - @Override - public void put(int fieldId, Tuple tuple) { - throw new UnsupportedException(); - } - - @Override - public void setOffset(long offset) { - throw new UnsupportedException(); - } - - @Override - public long getOffset() { - throw new UnsupportedException(); - } - - @Override - public void put(Datum [] values) { - throw new UnsupportedException(); - } - - @Override - public Datum get(int fieldId) { - Preconditions.checkArgument(fieldId < size, - "Out of field access: " + fieldId); - - if (fieldId < leftSize) { - return left.get(fieldId); - } else { - return right.get(fieldId - leftSize); - } - } - - @Override - public boolean getBool(int fieldId) { - return get(fieldId).asBool(); - } - - @Override - public byte getByte(int fieldId) { - return get(fieldId).asByte(); - } - - @Override - public char getChar(int fieldId) { - return get(fieldId).asChar(); - } - - @Override - public byte [] getBytes(int fieldId) { - return get(fieldId).asByteArray(); - } - - @Override - public short getInt2(int fieldId) { - return get(fieldId).asInt2(); - } - - @Override - public int getInt4(int fieldId) { - return get(fieldId).asInt4(); - } - - @Override - public long getInt8(int fieldId) { - return get(fieldId).asInt8(); - } - - @Override - public float getFloat4(int fieldId) { - return get(fieldId).asFloat4(); - } - - @Override - public double getFloat8(int fieldId) { - return get(fieldId).asFloat8(); - } - - @Override - public String getText(int fieldId) { - return get(fieldId).asChars(); - } - - @Override - public ProtobufDatum getProtobufDatum(int fieldId) { - return (ProtobufDatum) get(fieldId); - } - - @Override - public IntervalDatum getInterval(int fieldId) { - return (IntervalDatum) get(fieldId); - } - - @Override - public char [] getUnicodeChars(int fieldId) { - return get(fieldId).asUnicodeChars(); - } - - @Override - public Tuple clone() throws CloneNotSupportedException { - FrameTuple frameTuple = (FrameTuple) super.clone(); - frameTuple.set(this.left.clone(), this.right.clone()); - return frameTuple; - } - - @Override - public Datum[] getValues(){ - throw new UnsupportedException(); - } - - public String toString() { - boolean first = true; - StringBuilder str = new StringBuilder(); - str.append("("); - for(int i=0; i < size(); i++) { - if(contains(i)) { - if(first) { - first = false; - } else { - str.append(", "); - } - str.append(i) - .append("=>") - .append(get(i)); - } - } - str.append(")"); - return str.toString(); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java b/tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java deleted file mode 100644 index 40cad32..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppender.java +++ /dev/null @@ -1,209 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.tajo.ExecutionBlockId; -import org.apache.tajo.QueryUnitAttemptId; -import org.apache.tajo.catalog.statistics.TableStats; -import org.apache.tajo.util.Pair; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; - -public class HashShuffleAppender implements Appender { - private static Log LOG = LogFactory.getLog(HashShuffleAppender.class); - - private FileAppender appender; - private AtomicBoolean closed = new AtomicBoolean(false); - private int partId; - - private TableStats tableStats; - - //<taskId,<page start offset,<task start, task end>>> - private Map<QueryUnitAttemptId, List<Pair<Long, Pair<Integer, Integer>>>> taskTupleIndexes; - - //page start offset, length - private List<Pair<Long, Integer>> pages = new ArrayList<Pair<Long, Integer>>(); - - private Pair<Long, Integer> currentPage; - - private int pageSize; //MB - - private int rowNumInPage; - - private int totalRows; - - private long offset; - - private ExecutionBlockId ebId; - - public HashShuffleAppender(ExecutionBlockId ebId, int partId, int pageSize, FileAppender appender) { - this.ebId = ebId; - this.partId = partId; - this.appender = appender; - this.pageSize = pageSize; - } - - @Override - public void init() throws IOException { - currentPage = new Pair(0L, 0); - taskTupleIndexes = new HashMap<QueryUnitAttemptId, List<Pair<Long, Pair<Integer, Integer>>>>(); - rowNumInPage = 0; - } - - /** - * Write multiple tuples. Each tuple is written by a FileAppender which is responsible specified partition. - * After writing if a current page exceeds pageSize, pageOffset will be added. - * @param taskId - * @param tuples - * @return written bytes - * @throws IOException - */ - public int addTuples(QueryUnitAttemptId taskId, List<Tuple> tuples) throws IOException { - synchronized(appender) { - if (closed.get()) { - return 0; - } - long currentPos = appender.getOffset(); - - for (Tuple eachTuple: tuples) { - appender.addTuple(eachTuple); - } - long posAfterWritten = appender.getOffset(); - - int writtenBytes = (int)(posAfterWritten - currentPos); - - int nextRowNum = rowNumInPage + tuples.size(); - List<Pair<Long, Pair<Integer, Integer>>> taskIndexes = taskTupleIndexes.get(taskId); - if (taskIndexes == null) { - taskIndexes = new ArrayList<Pair<Long, Pair<Integer, Integer>>>(); - taskTupleIndexes.put(taskId, taskIndexes); - } - taskIndexes.add( - new Pair<Long, Pair<Integer, Integer>>(currentPage.getFirst(), new Pair(rowNumInPage, nextRowNum))); - rowNumInPage = nextRowNum; - - if (posAfterWritten - currentPage.getFirst() > pageSize) { - nextPage(posAfterWritten); - rowNumInPage = 0; - } - - totalRows += tuples.size(); - return writtenBytes; - } - } - - public long getOffset() throws IOException { - if (closed.get()) { - return offset; - } else { - return appender.getOffset(); - } - } - - private void nextPage(long pos) { - currentPage.setSecond((int) (pos - currentPage.getFirst())); - pages.add(currentPage); - currentPage = new Pair(pos, 0); - } - - @Override - public void addTuple(Tuple t) throws IOException { - throw new IOException("Not support addTuple, use addTuples()"); - } - - @Override - public void flush() throws IOException { - synchronized(appender) { - if (closed.get()) { - return; - } - appender.flush(); - } - } - - @Override - public long getEstimatedOutputSize() throws IOException { - return pageSize * pages.size(); - } - - @Override - public void close() throws IOException { - synchronized(appender) { - if (closed.get()) { - return; - } - appender.flush(); - offset = appender.getOffset(); - if (offset > currentPage.getFirst()) { - nextPage(offset); - } - appender.close(); - if (LOG.isDebugEnabled()) { - if (!pages.isEmpty()) { - LOG.info(ebId + ",partId=" + partId + " Appender closed: fileLen=" + offset + ", pages=" + pages.size() - + ", lastPage=" + pages.get(pages.size() - 1)); - } else { - LOG.info(ebId + ",partId=" + partId + " Appender closed: fileLen=" + offset + ", pages=" + pages.size()); - } - } - closed.set(true); - tableStats = appender.getStats(); - } - } - - @Override - public void enableStats() { - } - - @Override - public TableStats getStats() { - synchronized(appender) { - return appender.getStats(); - } - } - - public List<Pair<Long, Integer>> getPages() { - return pages; - } - - public Map<QueryUnitAttemptId, List<Pair<Long, Pair<Integer, Integer>>>> getTaskTupleIndexes() { - return taskTupleIndexes; - } - - public List<Pair<Long, Pair<Integer, Integer>>> getMergedTupleIndexes() { - List<Pair<Long, Pair<Integer, Integer>>> merged = new ArrayList<Pair<Long, Pair<Integer, Integer>>>(); - - for (List<Pair<Long, Pair<Integer, Integer>>> eachFailureIndex: taskTupleIndexes.values()) { - merged.addAll(eachFailureIndex); - } - - return merged; - } - - public void taskFinished(QueryUnitAttemptId taskId) { - taskTupleIndexes.remove(taskId); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java b/tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java deleted file mode 100644 index 33a9233..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/HashShuffleAppenderManager.java +++ /dev/null @@ -1,225 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocalDirAllocator; -import org.apache.hadoop.fs.Path; -import org.apache.tajo.ExecutionBlockId; -import org.apache.tajo.QueryUnitAttemptId; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.conf.TajoConf.ConfVars; -import org.apache.tajo.util.Pair; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -public class HashShuffleAppenderManager { - private static final Log LOG = LogFactory.getLog(HashShuffleAppenderManager.class); - - private Map<ExecutionBlockId, Map<Integer, PartitionAppenderMeta>> appenderMap = - new ConcurrentHashMap<ExecutionBlockId, Map<Integer, PartitionAppenderMeta>>(); - private TajoConf systemConf; - private FileSystem defaultFS; - private FileSystem localFS; - private LocalDirAllocator lDirAllocator; - private int pageSize; - - public HashShuffleAppenderManager(TajoConf systemConf) throws IOException { - this.systemConf = systemConf; - - // initialize LocalDirAllocator - lDirAllocator = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname); - - // initialize DFS and LocalFileSystems - defaultFS = TajoConf.getTajoRootDir(systemConf).getFileSystem(systemConf); - localFS = FileSystem.getLocal(systemConf); - pageSize = systemConf.getIntVar(ConfVars.SHUFFLE_HASH_APPENDER_PAGE_VOLUME) * 1024 * 1024; - } - - public HashShuffleAppender getAppender(TajoConf tajoConf, ExecutionBlockId ebId, int partId, - TableMeta meta, Schema outSchema) throws IOException { - synchronized (appenderMap) { - Map<Integer, PartitionAppenderMeta> partitionAppenderMap = appenderMap.get(ebId); - - if (partitionAppenderMap == null) { - partitionAppenderMap = new ConcurrentHashMap<Integer, PartitionAppenderMeta>(); - appenderMap.put(ebId, partitionAppenderMap); - } - - PartitionAppenderMeta partitionAppenderMeta = partitionAppenderMap.get(partId); - if (partitionAppenderMeta == null) { - Path dataFile = getDataFile(ebId, partId); - FileSystem fs = dataFile.getFileSystem(systemConf); - if (fs.exists(dataFile)) { - FileStatus status = fs.getFileStatus(dataFile); - LOG.info("File " + dataFile + " already exists, size=" + status.getLen()); - } - - if (!fs.exists(dataFile.getParent())) { - fs.mkdirs(dataFile.getParent()); - } - FileAppender appender = (FileAppender) StorageManager.getFileStorageManager( - tajoConf, null).getAppender(meta, outSchema, dataFile); - appender.enableStats(); - appender.init(); - - partitionAppenderMeta = new PartitionAppenderMeta(); - partitionAppenderMeta.partId = partId; - partitionAppenderMeta.dataFile = dataFile; - partitionAppenderMeta.appender = new HashShuffleAppender(ebId, partId, pageSize, appender); - partitionAppenderMeta.appender.init(); - partitionAppenderMap.put(partId, partitionAppenderMeta); - - LOG.info("Create Hash shuffle file(partId=" + partId + "): " + dataFile); - } - - return partitionAppenderMeta.appender; - } - } - - public static int getPartParentId(int partId, TajoConf tajoConf) { - return partId % tajoConf.getIntVar(TajoConf.ConfVars.HASH_SHUFFLE_PARENT_DIRS); - } - - private Path getDataFile(ExecutionBlockId ebId, int partId) throws IOException { - try { - // the base dir for an output dir - String executionBlockBaseDir = ebId.getQueryId().toString() + "/output" + "/" + ebId.getId() + "/hash-shuffle"; - Path baseDirPath = localFS.makeQualified(lDirAllocator.getLocalPathForWrite(executionBlockBaseDir, systemConf)); - //LOG.info(ebId + "'s basedir is created (" + baseDirPath + ")"); - - // If EB has many partition, too many shuffle file are in single directory. - return StorageUtil.concatPath(baseDirPath, "" + getPartParentId(partId, systemConf), "" + partId); - } catch (Exception e) { - LOG.error(e.getMessage(), e); - throw new IOException(e); - } - } - - public List<HashShuffleIntermediate> close(ExecutionBlockId ebId) throws IOException { - Map<Integer, PartitionAppenderMeta> partitionAppenderMap = null; - synchronized (appenderMap) { - partitionAppenderMap = appenderMap.remove(ebId); - } - - if (partitionAppenderMap == null) { - LOG.info("Close HashShuffleAppender:" + ebId + ", not a hash shuffle"); - return null; - } - - // Send Intermediate data to QueryMaster. - List<HashShuffleIntermediate> intermEntries = new ArrayList<HashShuffleIntermediate>(); - for (PartitionAppenderMeta eachMeta : partitionAppenderMap.values()) { - try { - eachMeta.appender.close(); - HashShuffleIntermediate intermediate = - new HashShuffleIntermediate(eachMeta.partId, eachMeta.appender.getOffset(), - eachMeta.appender.getPages(), - eachMeta.appender.getMergedTupleIndexes()); - intermEntries.add(intermediate); - } catch (IOException e) { - LOG.error(e.getMessage(), e); - throw e; - } - } - - LOG.info("Close HashShuffleAppender:" + ebId + ", intermediates=" + intermEntries.size()); - - return intermEntries; - } - - public void finalizeTask(QueryUnitAttemptId taskId) { - synchronized (appenderMap) { - Map<Integer, PartitionAppenderMeta> partitionAppenderMap = - appenderMap.get(taskId.getQueryUnitId().getExecutionBlockId()); - if (partitionAppenderMap == null) { - return; - } - - for (PartitionAppenderMeta eachAppender: partitionAppenderMap.values()) { - eachAppender.appender.taskFinished(taskId); - } - } - } - - public static class HashShuffleIntermediate { - private int partId; - - private long volume; - - //[<page start offset,<task start, task end>>] - private Collection<Pair<Long, Pair<Integer, Integer>>> failureTskTupleIndexes; - - //[<page start offset, length>] - private List<Pair<Long, Integer>> pages = new ArrayList<Pair<Long, Integer>>(); - - public HashShuffleIntermediate(int partId, long volume, - List<Pair<Long, Integer>> pages, - Collection<Pair<Long, Pair<Integer, Integer>>> failureTskTupleIndexes) { - this.partId = partId; - this.volume = volume; - this.failureTskTupleIndexes = failureTskTupleIndexes; - this.pages = pages; - } - - public int getPartId() { - return partId; - } - - public long getVolume() { - return volume; - } - - public Collection<Pair<Long, Pair<Integer, Integer>>> getFailureTskTupleIndexes() { - return failureTskTupleIndexes; - } - - public List<Pair<Long, Integer>> getPages() { - return pages; - } - } - - static class PartitionAppenderMeta { - int partId; - HashShuffleAppender appender; - Path dataFile; - - public int getPartId() { - return partId; - } - - public HashShuffleAppender getAppender() { - return appender; - } - - public Path getDataFile() { - return dataFile; - } - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java b/tajo-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java deleted file mode 100644 index bfbe478..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/LazyTuple.java +++ /dev/null @@ -1,270 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.datum.Datum; -import org.apache.tajo.datum.IntervalDatum; -import org.apache.tajo.datum.NullDatum; -import org.apache.tajo.datum.ProtobufDatum; -import org.apache.tajo.exception.UnsupportedException; - -import java.util.Arrays; - -public class LazyTuple implements Tuple, Cloneable { - private long offset; - private Datum[] values; - private byte[][] textBytes; - private Schema schema; - private byte[] nullBytes; - private SerializerDeserializer serializeDeserialize; - - public LazyTuple(Schema schema, byte[][] textBytes, long offset) { - this(schema, textBytes, offset, NullDatum.get().asTextBytes(), new TextSerializerDeserializer()); - } - - public LazyTuple(Schema schema, byte[][] textBytes, long offset, byte[] nullBytes, SerializerDeserializer serde) { - this.schema = schema; - this.textBytes = textBytes; - this.values = new Datum[schema.size()]; - this.offset = offset; - this.nullBytes = nullBytes; - this.serializeDeserialize = serde; - } - - public LazyTuple(LazyTuple tuple) { - this.values = tuple.getValues(); - this.offset = tuple.offset; - this.schema = tuple.schema; - this.textBytes = new byte[size()][]; - this.nullBytes = tuple.nullBytes; - this.serializeDeserialize = tuple.serializeDeserialize; - } - - @Override - public int size() { - return values.length; - } - - @Override - public boolean contains(int fieldid) { - return textBytes[fieldid] != null || values[fieldid] != null; - } - - @Override - public boolean isNull(int fieldid) { - return get(fieldid).isNull(); - } - - @Override - public boolean isNotNull(int fieldid) { - return !isNull(fieldid); - } - - @Override - public void clear() { - for (int i = 0; i < values.length; i++) { - values[i] = null; - textBytes[i] = null; - } - } - - ////////////////////////////////////////////////////// - // Setter - ////////////////////////////////////////////////////// - @Override - public void put(int fieldId, Datum value) { - values[fieldId] = value; - textBytes[fieldId] = null; - } - - @Override - public void put(int fieldId, Datum[] values) { - for (int i = fieldId, j = 0; j < values.length; i++, j++) { - this.values[i] = values[j]; - } - this.textBytes = new byte[values.length][]; - } - - @Override - public void put(int fieldId, Tuple tuple) { - for (int i = fieldId, j = 0; j < tuple.size(); i++, j++) { - values[i] = tuple.get(j); - textBytes[i] = null; - } - } - - @Override - public void put(Datum[] values) { - System.arraycopy(values, 0, this.values, 0, size()); - this.textBytes = new byte[values.length][]; - } - - ////////////////////////////////////////////////////// - // Getter - ////////////////////////////////////////////////////// - @Override - public Datum get(int fieldId) { - if (values[fieldId] != null) - return values[fieldId]; - else if (textBytes.length <= fieldId) { - values[fieldId] = NullDatum.get(); // split error. (col : 3, separator: ',', row text: "a,") - } else if (textBytes[fieldId] != null) { - try { - values[fieldId] = serializeDeserialize.deserialize(schema.getColumn(fieldId), - textBytes[fieldId], 0, textBytes[fieldId].length, nullBytes); - } catch (Exception e) { - values[fieldId] = NullDatum.get(); - } - textBytes[fieldId] = null; - } else { - //non-projection - } - return values[fieldId]; - } - - @Override - public void setOffset(long offset) { - this.offset = offset; - } - - @Override - public long getOffset() { - return this.offset; - } - - @Override - public boolean getBool(int fieldId) { - return get(fieldId).asBool(); - } - - @Override - public byte getByte(int fieldId) { - return get(fieldId).asByte(); - } - - @Override - public char getChar(int fieldId) { - return get(fieldId).asChar(); - } - - @Override - public byte [] getBytes(int fieldId) { - return get(fieldId).asByteArray(); - } - - @Override - public short getInt2(int fieldId) { - return get(fieldId).asInt2(); - } - - @Override - public int getInt4(int fieldId) { - return get(fieldId).asInt4(); - } - - @Override - public long getInt8(int fieldId) { - return get(fieldId).asInt8(); - } - - @Override - public float getFloat4(int fieldId) { - return get(fieldId).asFloat4(); - } - - @Override - public double getFloat8(int fieldId) { - return get(fieldId).asFloat8(); - } - - @Override - public String getText(int fieldId) { - return get(fieldId).asChars(); - } - - @Override - public ProtobufDatum getProtobufDatum(int fieldId) { - throw new UnsupportedException(); - } - - @Override - public IntervalDatum getInterval(int fieldId) { - return (IntervalDatum) get(fieldId); - } - - @Override - public char[] getUnicodeChars(int fieldId) { - return get(fieldId).asUnicodeChars(); - } - - public String toString() { - boolean first = true; - StringBuilder str = new StringBuilder(); - str.append("("); - Datum d; - for (int i = 0; i < values.length; i++) { - d = get(i); - if (d != null) { - if (first) { - first = false; - } else { - str.append(", "); - } - str.append(i) - .append("=>") - .append(d); - } - } - str.append(")"); - return str.toString(); - } - - @Override - public int hashCode() { - return Arrays.hashCode(values); - } - - @Override - public Datum[] getValues() { - Datum[] datums = new Datum[values.length]; - for (int i = 0; i < values.length; i++) { - datums[i] = get(i); - } - return datums; - } - - @Override - public Tuple clone() throws CloneNotSupportedException { - LazyTuple lazyTuple = (LazyTuple) super.clone(); - - lazyTuple.values = getValues(); //shallow copy - lazyTuple.textBytes = new byte[size()][]; - return lazyTuple; - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof Tuple) { - Tuple other = (Tuple) obj; - return Arrays.equals(getValues(), other.getValues()); - } - return false; - } -}
