TAJO-1233: Merge hbase_storage branch to the master branch. (Hyoungjun Kim via hyunsik)
Closes #288 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/4561711f Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/4561711f Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/4561711f Branch: refs/heads/master Commit: 4561711f0417eb3ee303f292b8ac44e4ecad8271 Parents: 758927e Author: Hyunsik Choi <[email protected]> Authored: Fri Dec 12 14:02:14 2014 +0900 Committer: Hyunsik Choi <[email protected]> Committed: Fri Dec 12 14:31:15 2014 +0900 ---------------------------------------------------------------------- BUILDING | 2 + CHANGES | 9 + .../org/apache/tajo/catalog/CatalogUtil.java | 2 + .../java/org/apache/tajo/catalog/Schema.java | 1 - .../src/main/proto/CatalogProtos.proto | 13 +- .../tajo-catalog-drivers/tajo-hcatalog/pom.xml | 2 +- tajo-client/pom.xml | 6 +- .../java/org/apache/tajo/cli/tsql/TajoCli.java | 2 +- .../org/apache/tajo/jdbc/TajoResultSet.java | 12 +- .../main/java/org/apache/tajo/QueryVars.java | 58 + .../java/org/apache/tajo/TajoConstants.java | 3 +- .../java/org/apache/tajo/conf/TajoConf.java | 5 + tajo-core/pom.xml | 40 +- .../org/apache/tajo/engine/parser/SQLParser.g4 | 10 +- .../tajo/engine/function/string/ToCharLong.java | 55 + .../apache/tajo/engine/json/CoreGsonHelper.java | 2 +- .../apache/tajo/engine/parser/SQLAnalyzer.java | 18 +- .../engine/planner/PhysicalPlannerImpl.java | 69 +- .../engine/planner/global/ExecutionBlock.java | 2 + .../planner/physical/BSTIndexScanExec.java | 7 +- .../planner/physical/ColPartitionStoreExec.java | 5 +- .../planner/physical/ExternalSortExec.java | 18 +- .../physical/HashShuffleFileWriteExec.java | 2 +- .../physical/PartitionMergeScanExec.java | 7 +- .../planner/physical/PhysicalPlanUtil.java | 9 +- .../physical/RangeShuffleFileWriteExec.java | 6 +- .../engine/planner/physical/SeqScanExec.java | 18 +- .../engine/planner/physical/StoreTableExec.java | 38 +- .../apache/tajo/engine/query/QueryContext.java | 45 +- .../org/apache/tajo/engine/utils/TupleUtil.java | 7 +- .../DefaultFragmentScheduleAlgorithm.java | 8 +- .../tajo/master/DefaultTaskScheduler.java | 5 +- .../org/apache/tajo/master/FragmentPair.java | 14 +- .../org/apache/tajo/master/GlobalEngine.java | 100 +- .../master/GreedyFragmentScheduleAlgorithm.java | 20 +- .../apache/tajo/master/LazyTaskScheduler.java | 26 +- .../master/NonForwardQueryResultScanner.java | 22 +- .../java/org/apache/tajo/master/TajoMaster.java | 7 +- .../tajo/master/TajoMasterClientService.java | 3 +- .../master/event/FragmentScheduleEvent.java | 16 +- .../apache/tajo/master/querymaster/Query.java | 385 +--- .../tajo/master/querymaster/QueryMaster.java | 9 - .../master/querymaster/QueryMasterTask.java | 52 +- .../tajo/master/querymaster/QueryUnit.java | 25 +- .../tajo/master/querymaster/Repartitioner.java | 141 +- .../tajo/master/querymaster/SubQuery.java | 49 +- .../master/rm/TajoWorkerResourceManager.java | 1 + .../java/org/apache/tajo/util/IndexUtil.java | 2 +- .../org/apache/tajo/worker/TajoQueryEngine.java | 4 +- .../main/java/org/apache/tajo/worker/Task.java | 48 +- .../main/resources/webapps/worker/queryunit.jsp | 5 +- .../org/apache/tajo/BackendTestingUtil.java | 2 +- .../org/apache/tajo/HBaseTestClusterUtil.java | 182 ++ .../java/org/apache/tajo/QueryTestCaseBase.java | 8 +- .../org/apache/tajo/TajoTestingCluster.java | 12 +- .../tajo/engine/planner/TestPlannerUtil.java | 4 +- .../planner/global/TestBroadcastJoinPlan.java | 9 +- .../planner/physical/TestBNLJoinExec.java | 20 +- .../planner/physical/TestBSTIndexExec.java | 20 +- .../planner/physical/TestExternalSortExec.java | 14 +- .../physical/TestFullOuterHashJoinExec.java | 41 +- .../physical/TestFullOuterMergeJoinExec.java | 64 +- .../planner/physical/TestHashAntiJoinExec.java | 15 +- .../planner/physical/TestHashJoinExec.java | 21 +- .../planner/physical/TestHashSemiJoinExec.java | 20 +- .../physical/TestLeftOuterHashJoinExec.java | 45 +- .../physical/TestLeftOuterNLJoinExec.java | 48 +- .../planner/physical/TestMergeJoinExec.java | 15 +- .../engine/planner/physical/TestNLJoinExec.java | 20 +- .../planner/physical/TestPhysicalPlanner.java | 135 +- .../physical/TestProgressExternalSortExec.java | 12 +- .../physical/TestRightOuterHashJoinExec.java | 34 +- .../physical/TestRightOuterMergeJoinExec.java | 54 +- .../engine/planner/physical/TestSortExec.java | 11 +- .../tajo/engine/query/TestHBaseTable.java | 1469 ++++++++++++++ .../tajo/engine/query/TestJoinBroadcast.java | 4 +- .../org/apache/tajo/jdbc/TestResultSet.java | 7 +- .../tajo/master/TestExecutionBlockCursor.java | 2 +- .../apache/tajo/storage/TestFileFragment.java | 4 +- .../org/apache/tajo/storage/TestRowFile.java | 7 +- .../tajo/worker/TestRangeRetrieverHandler.java | 22 +- .../dataset/TestHBaseTable/splits.data | 4 + .../TestHBaseTable/testBinaryMappedQuery.result | 81 + .../results/TestHBaseTable/testCATS.result | 100 + .../testColumnKeyValueSelectQuery.result | 12 + .../TestHBaseTable/testIndexPredication.result | 38 + .../TestHBaseTable/testInsertInto.result | 3 + .../testInsertIntoBinaryMultiRegion.result | 100 + .../testInsertIntoColumnKeyValue.result | 21 + .../testInsertIntoMultiRegion.result | 100 + .../testInsertIntoMultiRegion2.result | 100 + ...stInsertIntoMultiRegionMultiRowFields.result | 100 + ...estInsertIntoMultiRegionWithSplitFile.result | 100 + .../testInsertIntoRowField.result | 4 + .../testInsertIntoUsingPut.result | 3 + .../results/TestHBaseTable/testJoin.result | 7 + .../TestHBaseTable/testNonForwardQuery.result | 102 + .../testRowFieldSelectQuery.result | 88 + .../TestHBaseTable/testSimpleSelectQuery.result | 88 + tajo-dist/pom.xml | 7 +- tajo-dist/src/main/bin/tajo | 15 + tajo-dist/src/main/conf/tajo-env.sh | 3 + tajo-jdbc/pom.xml | 6 +- .../org/apache/tajo/plan/LogicalOptimizer.java | 7 + .../apache/tajo/plan/logical/InsertNode.java | 4 +- .../org/apache/tajo/plan/logical/SortNode.java | 20 +- .../rewrite/rules/PartitionedTableRewriter.java | 2 +- .../org/apache/tajo/plan/util/PlannerUtil.java | 109 +- tajo-project/pom.xml | 17 + tajo-pullserver/pom.xml | 7 +- tajo-storage/pom.xml | 310 +-- .../java/org/apache/tajo/storage/Appender.java | 41 - .../tajo/storage/BaseTupleComparator.java | 206 -- .../storage/BinarySerializerDeserializer.java | 258 --- .../org/apache/tajo/storage/BufferPool.java | 74 - .../tajo/storage/ByteBufInputChannel.java | 72 - .../java/org/apache/tajo/storage/CSVFile.java | 586 ------ .../tajo/storage/CompressedSplitLineReader.java | 182 -- .../org/apache/tajo/storage/DataLocation.java | 45 - .../org/apache/tajo/storage/DiskDeviceInfo.java | 62 - .../java/org/apache/tajo/storage/DiskInfo.java | 75 - .../org/apache/tajo/storage/DiskMountInfo.java | 101 - .../java/org/apache/tajo/storage/DiskUtil.java | 207 -- .../storage/FieldSerializerDeserializer.java | 37 - .../org/apache/tajo/storage/FileAppender.java | 65 - .../org/apache/tajo/storage/FileScanner.java | 123 -- .../org/apache/tajo/storage/FrameTuple.java | 225 --- .../tajo/storage/HashShuffleAppender.java | 209 -- .../storage/HashShuffleAppenderManager.java | 226 --- .../java/org/apache/tajo/storage/LazyTuple.java | 270 --- .../org/apache/tajo/storage/LineReader.java | 559 ------ .../org/apache/tajo/storage/MemoryUtil.java | 163 -- .../org/apache/tajo/storage/MergeScanner.java | 198 -- .../org/apache/tajo/storage/NullScanner.java | 62 - .../tajo/storage/NumericPathComparator.java | 34 - .../java/org/apache/tajo/storage/RawFile.java | 772 -------- .../java/org/apache/tajo/storage/RowFile.java | 496 ----- .../org/apache/tajo/storage/RowStoreUtil.java | 377 ---- .../java/org/apache/tajo/storage/Scanner.java | 103 - .../apache/tajo/storage/SeekableScanner.java | 28 - .../tajo/storage/SerializerDeserializer.java | 34 - .../apache/tajo/storage/SplitLineReader.java | 39 - .../java/org/apache/tajo/storage/Storage.java | 45 - .../org/apache/tajo/storage/StorageManager.java | 812 -------- .../org/apache/tajo/storage/StorageUtil.java | 224 --- .../apache/tajo/storage/TableStatistics.java | 129 -- .../storage/TextSerializerDeserializer.java | 227 --- .../apache/tajo/storage/TupleComparator.java | 32 - .../org/apache/tajo/storage/TupleRange.java | 112 -- .../storage/annotation/ForSplitableStore.java | 29 - .../apache/tajo/storage/avro/AvroAppender.java | 219 --- .../apache/tajo/storage/avro/AvroScanner.java | 286 --- .../org/apache/tajo/storage/avro/AvroUtil.java | 77 - .../apache/tajo/storage/avro/package-info.java | 85 - .../apache/tajo/storage/compress/CodecPool.java | 185 -- .../AlreadyExistsStorageException.java | 39 - .../exception/UnknownCodecException.java | 32 - .../exception/UnknownDataTypeException.java | 32 - .../exception/UnsupportedFileTypeException.java | 36 - .../tajo/storage/fragment/FileFragment.java | 224 --- .../apache/tajo/storage/fragment/Fragment.java | 31 - .../storage/fragment/FragmentConvertor.java | 132 -- .../apache/tajo/storage/index/IndexMethod.java | 33 - .../apache/tajo/storage/index/IndexReader.java | 35 - .../apache/tajo/storage/index/IndexWriter.java | 33 - .../tajo/storage/index/OrderIndexReader.java | 45 - .../apache/tajo/storage/index/bst/BSTIndex.java | 623 ------ .../tajo/storage/json/JsonLineDeserializer.java | 225 --- .../apache/tajo/storage/json/JsonLineSerDe.java | 37 - .../tajo/storage/json/JsonLineSerializer.java | 134 -- .../tajo/storage/parquet/ParquetAppender.java | 150 -- .../tajo/storage/parquet/ParquetScanner.java | 119 -- .../tajo/storage/parquet/TajoParquetReader.java | 85 - .../tajo/storage/parquet/TajoParquetWriter.java | 104 - .../tajo/storage/parquet/TajoReadSupport.java | 101 - .../storage/parquet/TajoRecordConverter.java | 380 ---- .../storage/parquet/TajoRecordMaterializer.java | 78 - .../storage/parquet/TajoSchemaConverter.java | 206 -- .../tajo/storage/parquet/TajoWriteSupport.java | 148 -- .../tajo/storage/parquet/package-info.java | 96 - .../storage/rcfile/BytesRefArrayWritable.java | 261 --- .../tajo/storage/rcfile/BytesRefWritable.java | 248 --- .../storage/rcfile/ColumnProjectionUtils.java | 117 -- .../rcfile/LazyDecompressionCallback.java | 32 - .../rcfile/NonSyncByteArrayInputStream.java | 113 -- .../rcfile/NonSyncByteArrayOutputStream.java | 144 -- .../storage/rcfile/NonSyncDataInputBuffer.java | 507 ----- .../storage/rcfile/NonSyncDataOutputBuffer.java | 91 - .../org/apache/tajo/storage/rcfile/RCFile.java | 1805 ----------------- .../SchemaAwareCompressionInputStream.java | 43 - .../SchemaAwareCompressionOutputStream.java | 44 - .../sequencefile/SequenceFileAppender.java | 272 --- .../sequencefile/SequenceFileScanner.java | 336 ---- .../tajo/storage/text/ByteBufLineReader.java | 182 -- .../tajo/storage/text/CSVLineDeserializer.java | 96 - .../apache/tajo/storage/text/CSVLineSerDe.java | 41 - .../tajo/storage/text/CSVLineSerializer.java | 71 - .../tajo/storage/text/DelimitedLineReader.java | 156 -- .../tajo/storage/text/DelimitedTextFile.java | 478 ----- .../tajo/storage/text/FieldSplitProcessor.java | 38 - .../tajo/storage/text/LineSplitProcessor.java | 45 - .../text/TextFieldSerializerDeserializer.java | 253 --- .../tajo/storage/text/TextLineDeserializer.java | 60 - .../tajo/storage/text/TextLineParsingError.java | 31 - .../apache/tajo/storage/text/TextLineSerDe.java | 65 - .../tajo/storage/text/TextLineSerializer.java | 45 - .../thirdparty/parquet/CodecFactory.java | 196 -- .../parquet/ColumnChunkPageWriteStore.java | 211 -- .../parquet/InternalParquetRecordReader.java | 187 -- .../parquet/InternalParquetRecordWriter.java | 160 -- .../thirdparty/parquet/ParquetFileWriter.java | 504 ----- .../thirdparty/parquet/ParquetReader.java | 151 -- .../thirdparty/parquet/ParquetWriter.java | 224 --- .../org/apache/tajo/tuple/BaseTupleBuilder.java | 112 -- .../org/apache/tajo/tuple/RowBlockReader.java | 33 - .../org/apache/tajo/tuple/TupleBuilder.java | 26 - .../tajo/tuple/offheap/DirectBufTuple.java | 41 - .../tajo/tuple/offheap/FixedSizeLimitSpec.java | 32 - .../apache/tajo/tuple/offheap/HeapTuple.java | 272 --- .../tajo/tuple/offheap/OffHeapMemory.java | 102 - .../tajo/tuple/offheap/OffHeapRowBlock.java | 176 -- .../tuple/offheap/OffHeapRowBlockReader.java | 63 - .../tuple/offheap/OffHeapRowBlockUtils.java | 54 - .../tuple/offheap/OffHeapRowBlockWriter.java | 58 - .../tajo/tuple/offheap/OffHeapRowWriter.java | 232 --- .../tajo/tuple/offheap/ResizableLimitSpec.java | 142 -- .../apache/tajo/tuple/offheap/RowWriter.java | 73 - .../apache/tajo/tuple/offheap/UnSafeTuple.java | 311 --- .../offheap/UnSafeTupleBytesComparator.java | 99 - .../tajo/tuple/offheap/ZeroCopyTuple.java | 35 - tajo-storage/src/main/proto/IndexProtos.proto | 31 - .../src/main/resources/storage-default.xml | 175 -- .../java/org/apache/tajo/HttpFileServer.java | 84 - .../org/apache/tajo/HttpFileServerHandler.java | 184 -- .../tajo/HttpFileServerPipelineFactory.java | 54 - .../tajo/storage/TestCompressionStorages.java | 185 -- .../tajo/storage/TestDelimitedTextFile.java | 164 -- .../apache/tajo/storage/TestFileSystems.java | 138 -- .../org/apache/tajo/storage/TestFrameTuple.java | 84 - .../org/apache/tajo/storage/TestLazyTuple.java | 258 --- .../org/apache/tajo/storage/TestLineReader.java | 220 --- .../apache/tajo/storage/TestMergeScanner.java | 201 -- .../apache/tajo/storage/TestSplitProcessor.java | 72 - .../apache/tajo/storage/TestStorageManager.java | 202 -- .../org/apache/tajo/storage/TestStorages.java | 868 --------- .../tajo/storage/TestTupleComparator.java | 77 - .../org/apache/tajo/storage/TestVTuple.java | 160 -- .../apache/tajo/storage/avro/TestAvroUtil.java | 108 -- .../apache/tajo/storage/index/TestBSTIndex.java | 946 --------- .../index/TestSingleCSVFileBSTIndex.java | 248 --- .../apache/tajo/storage/json/TestJsonSerDe.java | 101 - .../tajo/storage/parquet/TestReadWrite.java | 114 -- .../storage/parquet/TestSchemaConverter.java | 132 -- .../apache/tajo/tuple/TestBaseTupleBuilder.java | 76 - .../tajo/tuple/offheap/TestHeapTuple.java | 45 - .../tajo/tuple/offheap/TestOffHeapRowBlock.java | 577 ------ .../tajo/tuple/offheap/TestResizableSpec.java | 59 - .../testErrorTolerance1.json | 6 - .../testErrorTolerance2.json | 4 - .../dataset/TestJsonSerDe/testVariousType.json | 1 - .../src/test/resources/dataset/testLineText.txt | 2 - .../resources/dataset/testVariousTypes.avsc | 20 - .../src/test/resources/storage-default.xml | 168 -- tajo-storage/tajo-storage-common/pom.xml | 337 ++++ .../java/org/apache/tajo/storage/Appender.java | 41 + .../tajo/storage/BaseTupleComparator.java | 206 ++ .../storage/BinarySerializerDeserializer.java | 258 +++ .../org/apache/tajo/storage/BufferPool.java | 74 + .../tajo/storage/ByteBufInputChannel.java | 72 + .../org/apache/tajo/storage/DataLocation.java | 45 + .../org/apache/tajo/storage/DiskDeviceInfo.java | 62 + .../java/org/apache/tajo/storage/DiskInfo.java | 75 + .../org/apache/tajo/storage/DiskMountInfo.java | 101 + .../java/org/apache/tajo/storage/DiskUtil.java | 207 ++ .../org/apache/tajo/storage/FrameTuple.java | 225 +++ .../java/org/apache/tajo/storage/LazyTuple.java | 270 +++ .../org/apache/tajo/storage/MemoryUtil.java | 163 ++ .../org/apache/tajo/storage/MergeScanner.java | 201 ++ .../org/apache/tajo/storage/NullScanner.java | 109 ++ .../tajo/storage/NumericPathComparator.java | 34 + .../org/apache/tajo/storage/RowStoreUtil.java | 377 ++++ .../java/org/apache/tajo/storage/Scanner.java | 103 + .../apache/tajo/storage/SeekableScanner.java | 28 + .../tajo/storage/SerializerDeserializer.java | 34 + .../org/apache/tajo/storage/StorageManager.java | 979 ++++++++++ .../apache/tajo/storage/StorageProperty.java | 40 + .../org/apache/tajo/storage/StorageUtil.java | 220 +++ .../apache/tajo/storage/TableStatistics.java | 129 ++ .../storage/TextSerializerDeserializer.java | 227 +++ .../apache/tajo/storage/TupleComparator.java | 32 + .../org/apache/tajo/storage/TupleRange.java | 112 ++ .../storage/annotation/ForSplitableStore.java | 29 + .../apache/tajo/storage/compress/CodecPool.java | 185 ++ .../AlreadyExistsStorageException.java | 39 + .../exception/UnknownCodecException.java | 32 + .../exception/UnknownDataTypeException.java | 32 + .../exception/UnsupportedFileTypeException.java | 36 + .../apache/tajo/storage/fragment/Fragment.java | 39 + .../storage/fragment/FragmentConvertor.java | 129 ++ .../org/apache/tajo/tuple/BaseTupleBuilder.java | 112 ++ .../org/apache/tajo/tuple/RowBlockReader.java | 33 + .../org/apache/tajo/tuple/TupleBuilder.java | 26 + .../tajo/tuple/offheap/DirectBufTuple.java | 41 + .../tajo/tuple/offheap/FixedSizeLimitSpec.java | 32 + .../apache/tajo/tuple/offheap/HeapTuple.java | 272 +++ .../tajo/tuple/offheap/OffHeapMemory.java | 102 + .../tajo/tuple/offheap/OffHeapRowBlock.java | 176 ++ .../tuple/offheap/OffHeapRowBlockReader.java | 63 + .../tuple/offheap/OffHeapRowBlockUtils.java | 54 + .../tuple/offheap/OffHeapRowBlockWriter.java | 58 + .../tajo/tuple/offheap/OffHeapRowWriter.java | 232 +++ .../tajo/tuple/offheap/ResizableLimitSpec.java | 142 ++ .../apache/tajo/tuple/offheap/RowWriter.java | 73 + .../apache/tajo/tuple/offheap/UnSafeTuple.java | 311 +++ .../offheap/UnSafeTupleBytesComparator.java | 99 + .../tajo/tuple/offheap/ZeroCopyTuple.java | 35 + .../src/main/proto/IndexProtos.proto | 31 + .../src/main/resources/storage-default.xml | 198 ++ .../org/apache/tajo/storage/TestFrameTuple.java | 84 + .../org/apache/tajo/storage/TestLazyTuple.java | 258 +++ .../tajo/storage/TestTupleComparator.java | 77 + .../org/apache/tajo/storage/TestVTuple.java | 160 ++ .../apache/tajo/tuple/TestBaseTupleBuilder.java | 76 + .../tajo/tuple/offheap/TestHeapTuple.java | 45 + .../tajo/tuple/offheap/TestOffHeapRowBlock.java | 577 ++++++ .../tajo/tuple/offheap/TestResizableSpec.java | 59 + .../src/test/resources/storage-default.xml | 164 ++ tajo-storage/tajo-storage-hbase/pom.xml | 349 ++++ .../storage/hbase/AbstractHBaseAppender.java | 223 +++ .../storage/hbase/AddSortForInsertRewriter.java | 90 + .../tajo/storage/hbase/ColumnMapping.java | 236 +++ .../HBaseBinarySerializerDeserializer.java | 97 + .../tajo/storage/hbase/HBaseFragment.java | 198 ++ .../tajo/storage/hbase/HBasePutAppender.java | 120 ++ .../apache/tajo/storage/hbase/HBaseScanner.java | 449 +++++ .../storage/hbase/HBaseStorageConstants.java | 33 + .../tajo/storage/hbase/HBaseStorageManager.java | 1135 +++++++++++ .../hbase/HBaseTextSerializerDeserializer.java | 71 + .../tajo/storage/hbase/HFileAppender.java | 167 ++ .../tajo/storage/hbase/IndexPredication.java | 61 + .../tajo/storage/hbase/RowKeyMapping.java | 40 + .../src/main/proto/StorageFragmentProtos.proto | 35 + .../tajo/storage/hbase/TestColumnMapping.java | 93 + .../storage/hbase/TestHBaseStorageManager.java | 109 ++ tajo-storage/tajo-storage-hdfs/pom.xml | 385 ++++ .../java/org/apache/tajo/storage/CSVFile.java | 587 ++++++ .../tajo/storage/CompressedSplitLineReader.java | 182 ++ .../storage/FieldSerializerDeserializer.java | 37 + .../org/apache/tajo/storage/FileAppender.java | 87 + .../org/apache/tajo/storage/FileScanner.java | 124 ++ .../apache/tajo/storage/FileStorageManager.java | 882 +++++++++ .../tajo/storage/HashShuffleAppender.java | 209 ++ .../storage/HashShuffleAppenderManager.java | 225 +++ .../org/apache/tajo/storage/LineReader.java | 559 ++++++ .../java/org/apache/tajo/storage/RawFile.java | 773 ++++++++ .../java/org/apache/tajo/storage/RowFile.java | 498 +++++ .../apache/tajo/storage/SplitLineReader.java | 39 + .../apache/tajo/storage/avro/AvroAppender.java | 220 +++ .../apache/tajo/storage/avro/AvroScanner.java | 286 +++ .../org/apache/tajo/storage/avro/AvroUtil.java | 77 + .../apache/tajo/storage/avro/package-info.java | 85 + .../tajo/storage/fragment/FileFragment.java | 237 +++ .../apache/tajo/storage/index/IndexMethod.java | 32 + .../apache/tajo/storage/index/IndexReader.java | 35 + .../apache/tajo/storage/index/IndexWriter.java | 33 + .../tajo/storage/index/OrderIndexReader.java | 45 + .../apache/tajo/storage/index/bst/BSTIndex.java | 623 ++++++ .../tajo/storage/json/JsonLineDeserializer.java | 225 +++ .../apache/tajo/storage/json/JsonLineSerDe.java | 37 + .../tajo/storage/json/JsonLineSerializer.java | 130 ++ .../tajo/storage/parquet/ParquetAppender.java | 151 ++ .../tajo/storage/parquet/ParquetScanner.java | 119 ++ .../tajo/storage/parquet/TajoParquetReader.java | 85 + .../tajo/storage/parquet/TajoParquetWriter.java | 104 + .../tajo/storage/parquet/TajoReadSupport.java | 99 + .../storage/parquet/TajoRecordConverter.java | 380 ++++ .../storage/parquet/TajoRecordMaterializer.java | 77 + .../storage/parquet/TajoSchemaConverter.java | 206 ++ .../tajo/storage/parquet/TajoWriteSupport.java | 148 ++ .../tajo/storage/parquet/package-info.java | 96 + .../storage/rcfile/BytesRefArrayWritable.java | 261 +++ .../tajo/storage/rcfile/BytesRefWritable.java | 248 +++ .../storage/rcfile/ColumnProjectionUtils.java | 117 ++ .../rcfile/LazyDecompressionCallback.java | 32 + .../rcfile/NonSyncByteArrayInputStream.java | 113 ++ .../rcfile/NonSyncByteArrayOutputStream.java | 144 ++ .../storage/rcfile/NonSyncDataInputBuffer.java | 507 +++++ .../storage/rcfile/NonSyncDataOutputBuffer.java | 91 + .../org/apache/tajo/storage/rcfile/RCFile.java | 1807 ++++++++++++++++++ .../SchemaAwareCompressionInputStream.java | 43 + .../SchemaAwareCompressionOutputStream.java | 44 + .../sequencefile/SequenceFileAppender.java | 274 +++ .../sequencefile/SequenceFileScanner.java | 336 ++++ .../tajo/storage/text/ByteBufLineReader.java | 182 ++ .../tajo/storage/text/CSVLineDeserializer.java | 96 + .../apache/tajo/storage/text/CSVLineSerDe.java | 41 + .../tajo/storage/text/CSVLineSerializer.java | 70 + .../tajo/storage/text/DelimitedLineReader.java | 156 ++ .../tajo/storage/text/DelimitedTextFile.java | 481 +++++ .../tajo/storage/text/FieldSplitProcessor.java | 38 + .../tajo/storage/text/LineSplitProcessor.java | 45 + .../text/TextFieldSerializerDeserializer.java | 253 +++ .../tajo/storage/text/TextLineDeserializer.java | 60 + .../tajo/storage/text/TextLineParsingError.java | 31 + .../apache/tajo/storage/text/TextLineSerDe.java | 65 + .../tajo/storage/text/TextLineSerializer.java | 45 + .../thirdparty/parquet/CodecFactory.java | 190 ++ .../parquet/ColumnChunkPageWriteStore.java | 206 ++ .../parquet/InternalParquetRecordReader.java | 188 ++ .../parquet/InternalParquetRecordWriter.java | 160 ++ .../thirdparty/parquet/ParquetFileWriter.java | 492 +++++ .../thirdparty/parquet/ParquetReader.java | 146 ++ .../thirdparty/parquet/ParquetWriter.java | 224 +++ .../src/main/proto/StorageFragmentProtos.proto | 34 + .../java/org/apache/tajo/HttpFileServer.java | 84 + .../org/apache/tajo/HttpFileServerHandler.java | 184 ++ .../tajo/HttpFileServerPipelineFactory.java | 54 + .../tajo/storage/TestCompressionStorages.java | 185 ++ .../tajo/storage/TestDelimitedTextFile.java | 163 ++ .../tajo/storage/TestFileStorageManager.java | 203 ++ .../apache/tajo/storage/TestFileSystems.java | 137 ++ .../org/apache/tajo/storage/TestLineReader.java | 220 +++ .../apache/tajo/storage/TestMergeScanner.java | 202 ++ .../apache/tajo/storage/TestSplitProcessor.java | 72 + .../org/apache/tajo/storage/TestStorages.java | 878 +++++++++ .../apache/tajo/storage/avro/TestAvroUtil.java | 106 + .../apache/tajo/storage/index/TestBSTIndex.java | 947 +++++++++ .../index/TestSingleCSVFileBSTIndex.java | 248 +++ .../apache/tajo/storage/json/TestJsonSerDe.java | 101 + .../tajo/storage/parquet/TestReadWrite.java | 109 ++ .../storage/parquet/TestSchemaConverter.java | 130 ++ .../testErrorTolerance1.json | 6 + .../testErrorTolerance2.json | 4 + .../dataset/TestJsonSerDe/testVariousType.json | 1 + .../src/test/resources/dataset/testLineText.txt | 2 + .../resources/dataset/testVariousTypes.avsc | 20 + .../src/test/resources/storage-default.xml | 178 ++ 437 files changed, 35844 insertions(+), 27780 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/BUILDING ---------------------------------------------------------------------- diff --git a/BUILDING b/BUILDING index 8606d80..0c4bbf8 100644 --- a/BUILDING +++ b/BUILDING @@ -44,6 +44,8 @@ Maven build goals: * Use -Dtar to create a TAR with the distribution (using -Pdist) * Use -Dhadoop.version to build with the specific hadoop version (-Dhadoop.version=2.5.1) * Currently, 2.3.0 or higher are supported. + * Use -Dhbase.version to build with the specific hbase version (-Dhbase.version=0.98.7-hadoop2) + * Currently, 0.98.x-hadoop2 or higher are tested. Tests options: * Use -DskipTests to skip tests when running the following Maven goals: http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index ac13ec5..2df5185 100644 --- a/CHANGES +++ b/CHANGES @@ -10,6 +10,10 @@ Release 0.9.1 - unreleased TAJO-1222: DelimitedTextFile should be tolerant against parsing errors. (hyunsik) + TAJO-1131: Supports Inserting or Creating table into + the HBase mapped table.(Hyoungjun Kim) + + TAJO-1026: Implement Query history persistency manager.(Hyoungjun Kim) TAJO-233: Support PostgreSQL CatalogStore. (Jihun Kang via hyunsik) @@ -35,6 +39,9 @@ Release 0.9.1 - unreleased TAJO-1213: Implement CatalogStore::updateTableStats. (jaehwa) + TAJO-1122: Refactor the tajo-storage project structure. + (Hyoungjun Kim) + TAJO-1165: Needs to show error messages on query_executor.jsp. (Jihun Kang via jaehwa) @@ -69,6 +76,8 @@ Release 0.9.1 - unreleased TAJO-1125: Separate logical plan and optimizer into a maven module. (hyunsik) + TAJO-1123: Use Fragment instead of FileFragment.(Hyoungjun Kim) + TAJO-1092: Improve the function system to allow other function implementation types. (hyunsik) http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java index df2112e..cbcac63 100644 --- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java +++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java @@ -283,6 +283,8 @@ public class CatalogUtil { return StoreType.TEXTFILE; } else if (typeStr.equalsIgnoreCase(StoreType.JSON.name())) { return StoreType.JSON; + } else if (typeStr.equalsIgnoreCase(StoreType.HBASE.name())) { + return StoreType.HBASE; } else { return null; } http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java index 296230b..672b8e3 100644 --- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java +++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/Schema.java @@ -25,7 +25,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.tajo.catalog.exception.AlreadyExistsFieldException; import org.apache.tajo.catalog.json.CatalogGsonHelper; -import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.catalog.proto.CatalogProtos.ColumnProto; import org.apache.tajo.catalog.proto.CatalogProtos.SchemaProto; import org.apache.tajo.common.ProtoObject; http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto index 22c08d8..946b563 100644 --- a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto +++ b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto @@ -37,6 +37,7 @@ enum StoreType { AVRO = 9; TEXTFILE = 10; JSON = 11; + HBASE = 12; } enum OrderType { @@ -69,16 +70,8 @@ message SchemaProto { message FragmentProto { required string id = 1; - required bytes contents = 2; -} - -message FileFragmentProto { - required string id = 1; - required string path = 2; - required int64 startOffset = 3; - required int64 length = 4; - repeated string hosts = 7; - repeated int32 diskIds = 8; + required string storeType = 2; + required bytes contents = 3; } message TableProto { http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/pom.xml ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/pom.xml b/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/pom.xml index 2950a96..4187150 100644 --- a/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/pom.xml +++ b/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/pom.xml @@ -113,7 +113,7 @@ </dependency> <dependency> <groupId>org.apache.tajo</groupId> - <artifactId>tajo-storage</artifactId> + <artifactId>tajo-storage-common</artifactId> </dependency> <dependency> <groupId>junit</groupId> http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-client/pom.xml ---------------------------------------------------------------------- diff --git a/tajo-client/pom.xml b/tajo-client/pom.xml index 7fc8c74..c023db8 100644 --- a/tajo-client/pom.xml +++ b/tajo-client/pom.xml @@ -195,7 +195,11 @@ </dependency> <dependency> <groupId>org.apache.tajo</groupId> - <artifactId>tajo-storage</artifactId> + <artifactId>tajo-storage-common</artifactId> + </dependency> + <dependency> + <groupId>org.apache.tajo</groupId> + <artifactId>tajo-storage-hdfs</artifactId> </dependency> <dependency> <groupId>org.apache.tajo</groupId> http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-client/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java b/tajo-client/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java index d4886cf..6c5006e 100644 --- a/tajo-client/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java +++ b/tajo-client/src/main/java/org/apache/tajo/cli/tsql/TajoCli.java @@ -25,13 +25,13 @@ import jline.UnsupportedTerminal; import jline.console.ConsoleReader; import org.apache.commons.cli.*; import org.apache.tajo.*; +import org.apache.tajo.ipc.*; import org.apache.tajo.TajoProtos.QueryState; import org.apache.tajo.catalog.TableDesc; import org.apache.tajo.cli.tsql.commands.*; import org.apache.tajo.client.*; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; -import org.apache.tajo.ipc.ClientProtos; import org.apache.tajo.util.FileUtil; import java.io.*; http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSet.java ---------------------------------------------------------------------- diff --git a/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSet.java b/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSet.java index e2ac3b1..bf33082 100644 --- a/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSet.java +++ b/tajo-client/src/main/java/org/apache/tajo/jdbc/TajoResultSet.java @@ -30,11 +30,9 @@ import org.apache.tajo.catalog.TableDesc; import org.apache.tajo.client.QueryClient; import org.apache.tajo.client.TajoClient; import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.storage.FileScanner; -import org.apache.tajo.storage.MergeScanner; -import org.apache.tajo.storage.Scanner; -import org.apache.tajo.storage.Tuple; +import org.apache.tajo.storage.*; import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.storage.fragment.Fragment; import java.io.IOException; import java.sql.SQLException; @@ -94,7 +92,7 @@ public class TajoResultSet extends TajoResultSetBase { totalRow = INFINITE_ROW_NUM; } - List<FileFragment> frags = getFragments(new Path(desc.getPath())); + List<Fragment> frags = getFragments(new Path(desc.getPath())); scanner = new MergeScanner(conf, desc.getSchema(), desc.getMeta(), frags); } } @@ -113,9 +111,9 @@ public class TajoResultSet extends TajoResultSetBase { } } - private List<FileFragment> getFragments(Path tablePath) + private List<Fragment> getFragments(Path tablePath) throws IOException { - List<FileFragment> fragments = Lists.newArrayList(); + List<Fragment> fragments = Lists.newArrayList(); FileStatus[] files = fs.listStatus(tablePath, new PathFilter() { @Override public boolean accept(Path path) { http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-common/src/main/java/org/apache/tajo/QueryVars.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/QueryVars.java b/tajo-common/src/main/java/org/apache/tajo/QueryVars.java new file mode 100644 index 0000000..ba76d63 --- /dev/null +++ b/tajo-common/src/main/java/org/apache/tajo/QueryVars.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo; + +import org.apache.tajo.validation.Validator; + +public enum QueryVars implements ConfigKey { + COMMAND_TYPE, + STAGING_DIR, + OUTPUT_TABLE_NAME, + OUTPUT_TABLE_PATH, + OUTPUT_PARTITIONS, + OUTPUT_OVERWRITE, + OUTPUT_AS_DIRECTORY, + OUTPUT_PER_FILE_SIZE, + ; + + QueryVars() { + } + + @Override + public String keyname() { + return name().toLowerCase(); + } + + @Override + public ConfigType type() { + return ConfigType.QUERY; + } + + @Override + public Class<?> valueClass() { + return null; + } + + @Override + public Validator validator() { + return null; + } +} + + http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-common/src/main/java/org/apache/tajo/TajoConstants.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/TajoConstants.java b/tajo-common/src/main/java/org/apache/tajo/TajoConstants.java index 9250edd..15336bb 100644 --- a/tajo-common/src/main/java/org/apache/tajo/TajoConstants.java +++ b/tajo-common/src/main/java/org/apache/tajo/TajoConstants.java @@ -39,8 +39,9 @@ public class TajoConstants { public static final String SYSTEM_HA_DIR_NAME = "ha"; public static final String SYSTEM_HA_ACTIVE_DIR_NAME = "active"; public static final String SYSTEM_HA_BACKUP_DIR_NAME = "backup"; - public static final int UNKNOWN_ROW_NUMBER = -1; + public static final int UNKNOWN_ROW_NUMBER = -1; + public static final int UNKNOWN_LENGTH = -1; private TajoConstants() {} } http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java index ed38cdc..d0c6460 100644 --- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java +++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java @@ -263,6 +263,11 @@ public class TajoConf extends Configuration { HISTORY_EXPIRY_TIME_DAY("tajo.history.expiry-time-day", 7), // Misc ------------------------------------------------------------------- + // Fragment + // When making physical plan, the length of fragment is used to determine the physical operation. + // Some storage does not know the size of the fragment. + // In this case PhysicalPlanner uses this value to determine. + FRAGMENT_ALTERNATIVE_UNKNOWN_LENGTH("tajo.fragment.alternative.unknown.length", (long)(512 * 1024 * 1024)), // Geo IP GEOIP_DATA("tajo.function.geoip-database-location", ""), http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/pom.xml ---------------------------------------------------------------------- diff --git a/tajo-core/pom.xml b/tajo-core/pom.xml index 7322219..a6fb5a5 100644 --- a/tajo-core/pom.xml +++ b/tajo-core/pom.xml @@ -247,7 +247,16 @@ </dependency> <dependency> <groupId>org.apache.tajo</groupId> - <artifactId>tajo-storage</artifactId> + <artifactId>tajo-storage-common</artifactId> + </dependency> + <dependency> + <groupId>org.apache.tajo</groupId> + <artifactId>tajo-storage-hdfs</artifactId> + </dependency> + <dependency> + <groupId>org.apache.tajo</groupId> + <artifactId>tajo-storage-hbase</artifactId> + <scope>test</scope> </dependency> <dependency> <groupId>org.apache.tajo</groupId> @@ -398,6 +407,33 @@ <artifactId>gmetric4j</artifactId> <version>1.0.3</version> </dependency> + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-server</artifactId> + <version>${hbase.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-server</artifactId> + <version>${hbase.version}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-hadoop-compat</artifactId> + <version>${hbase.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-hadoop2-compat</artifactId> + <version>${hbase.version}</version> + <type>test-jar</type> + <scope>test</scope> + </dependency> </dependencies> <profiles> @@ -457,8 +493,8 @@ <dependency> <groupId>org.apache.tajo</groupId> <artifactId>tajo-hcatalog</artifactId> - <scope>test</scope> <version>${tajo.version}</version> + <scope>test</scope> <exclusions> <exclusion> <groupId>com.google.protobuf</groupId> http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/main/antlr4/org/apache/tajo/engine/parser/SQLParser.g4 ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/antlr4/org/apache/tajo/engine/parser/SQLParser.g4 b/tajo-core/src/main/antlr4/org/apache/tajo/engine/parser/SQLParser.g4 index 76afb6b..f3cd298 100644 --- a/tajo-core/src/main/antlr4/org/apache/tajo/engine/parser/SQLParser.g4 +++ b/tajo-core/src/main/antlr4/org/apache/tajo/engine/parser/SQLParser.g4 @@ -98,11 +98,11 @@ if_exists ; create_table_statement - : CREATE EXTERNAL TABLE (if_not_exists)? table_name table_elements USING file_type=identifier - (param_clause)? (table_partitioning_clauses)? (LOCATION path=Character_String_Literal) - | CREATE TABLE (if_not_exists)? table_name table_elements (USING file_type=identifier)? + : CREATE EXTERNAL TABLE (if_not_exists)? table_name table_elements USING storage_type=identifier + (param_clause)? (table_partitioning_clauses)? (LOCATION path=Character_String_Literal)? + | CREATE TABLE (if_not_exists)? table_name table_elements (USING storage_type=identifier)? (param_clause)? (table_partitioning_clauses)? (AS query_expression)? - | CREATE TABLE (if_not_exists)? table_name (USING file_type=identifier)? + | CREATE TABLE (if_not_exists)? table_name (USING storage_type=identifier)? (param_clause)? (table_partitioning_clauses)? AS query_expression | CREATE TABLE (if_not_exists)? table_name LIKE like_table_name=table_name ; @@ -1572,7 +1572,7 @@ null_ordering insert_statement : INSERT (OVERWRITE)? INTO table_name (LEFT_PAREN column_name_list RIGHT_PAREN)? query_expression - | INSERT (OVERWRITE)? INTO LOCATION path=Character_String_Literal (USING file_type=identifier (param_clause)?)? query_expression + | INSERT (OVERWRITE)? INTO LOCATION path=Character_String_Literal (USING storage_type=identifier (param_clause)?)? query_expression ; /* http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/main/java/org/apache/tajo/engine/function/string/ToCharLong.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/function/string/ToCharLong.java b/tajo-core/src/main/java/org/apache/tajo/engine/function/string/ToCharLong.java new file mode 100644 index 0000000..5fed940 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/engine/function/string/ToCharLong.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.function.string; + +import org.apache.tajo.catalog.Column; +import org.apache.tajo.common.TajoDataTypes; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.TextDatum; +import org.apache.tajo.engine.function.annotation.Description; +import org.apache.tajo.engine.function.annotation.ParamTypes; +import org.apache.tajo.plan.function.GeneralFunction; +import org.apache.tajo.storage.Tuple; + +import java.text.DecimalFormat; + +@Description( + functionName = "to_char", + description = "convert integer to string.", + example = "> SELECT to_char(125, '00999');\n" + + "00125", + returnType = TajoDataTypes.Type.TEXT, + paramTypes = {@ParamTypes(paramTypes = {TajoDataTypes.Type.INT8, TajoDataTypes.Type.TEXT})} +) + +public class ToCharLong extends GeneralFunction { + DecimalFormat df = null; + + public ToCharLong() { + super(new Column[]{new Column("val", TajoDataTypes.Type.INT8), new Column("format", TajoDataTypes.Type.TEXT)}); + } + + @Override + public Datum eval(Tuple params) { + if (df == null) { + df = new DecimalFormat(params.get(1).asChars()); + } + return new TextDatum(df.format(params.get(0).asInt8())); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/main/java/org/apache/tajo/engine/json/CoreGsonHelper.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/json/CoreGsonHelper.java b/tajo-core/src/main/java/org/apache/tajo/engine/json/CoreGsonHelper.java index 5a03bfd..0f2cc91 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/json/CoreGsonHelper.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/json/CoreGsonHelper.java @@ -26,12 +26,12 @@ import org.apache.tajo.catalog.json.FunctionAdapter; import org.apache.tajo.catalog.json.TableMetaAdapter; import org.apache.tajo.common.TajoDataTypes.DataType; import org.apache.tajo.datum.Datum; +import org.apache.tajo.json.*; import org.apache.tajo.plan.expr.EvalNode; import org.apache.tajo.plan.function.AggFunction; import org.apache.tajo.plan.function.GeneralFunction; import org.apache.tajo.plan.logical.LogicalNode; import org.apache.tajo.function.Function; -import org.apache.tajo.json.*; import org.apache.tajo.plan.serder.EvalNodeAdapter; import org.apache.tajo.plan.serder.LogicalNodeAdapter; import org.apache.tajo.util.TUtil; http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java b/tajo-core/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java index 7d6c951..90230c9 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/parser/SQLAnalyzer.java @@ -30,6 +30,7 @@ import org.apache.tajo.algebra.Aggregation.GroupType; import org.apache.tajo.algebra.LiteralValue.LiteralType; import org.apache.tajo.catalog.CatalogUtil; import org.apache.tajo.engine.parser.SQLParser.*; +import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.storage.StorageConstants; import org.apache.tajo.util.StringUtils; @@ -63,6 +64,7 @@ public class SQLAnalyzer extends SQLParserBaseVisitor<Expr> { try { context = parser.sql(); } catch (SQLParseError e) { + e.printStackTrace(); throw new SQLSyntaxError(e); } return visitSql(context); @@ -1201,12 +1203,14 @@ public class SQLAnalyzer extends SQLParserBaseVisitor<Expr> { createTable.setExternal(); ColumnDefinition[] elements = getDefinitions(ctx.table_elements()); - String fileType = ctx.file_type.getText(); - String path = stripQuote(ctx.path.getText()); - + String storageType = ctx.storage_type.getText(); createTable.setTableElements(elements); - createTable.setStorageType(fileType); - createTable.setLocation(path); + createTable.setStorageType(storageType); + + if (PlannerUtil.isFileStorageType(storageType)) { + String path = stripQuote(ctx.path.getText()); + createTable.setLocation(path); + } } else { if (checkIfExist(ctx.table_elements())) { ColumnDefinition[] elements = getDefinitions(ctx.table_elements()); @@ -1214,7 +1218,7 @@ public class SQLAnalyzer extends SQLParserBaseVisitor<Expr> { } if (checkIfExist(ctx.USING())) { - String fileType = ctx.file_type.getText(); + String fileType = ctx.storage_type.getText(); createTable.setStorageType(fileType); } @@ -1488,7 +1492,7 @@ public class SQLAnalyzer extends SQLParserBaseVisitor<Expr> { insertExpr.setLocation(stripQuote(ctx.path.getText())); if (ctx.USING() != null) { - insertExpr.setStorageType(ctx.file_type.getText()); + insertExpr.setStorageType(ctx.storage_type.getText()); if (ctx.param_clause() != null) { insertExpr.setParams(escapeTableMeta(getParams(ctx.param_clause()))); http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java index 6806609..2a34637 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/PhysicalPlannerImpl.java @@ -47,11 +47,9 @@ import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.SortSpecAr import org.apache.tajo.plan.LogicalPlan; import org.apache.tajo.plan.logical.*; import org.apache.tajo.plan.util.PlannerUtil; -import org.apache.tajo.storage.BaseTupleComparator; -import org.apache.tajo.storage.StorageConstants; -import org.apache.tajo.storage.StorageManager; -import org.apache.tajo.storage.TupleComparator; +import org.apache.tajo.storage.*; import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.storage.fragment.Fragment; import org.apache.tajo.storage.fragment.FragmentConvertor; import org.apache.tajo.util.FileUtil; import org.apache.tajo.util.IndexUtil; @@ -77,11 +75,9 @@ public class PhysicalPlannerImpl implements PhysicalPlanner { private static final int UNGENERATED_PID = -1; protected final TajoConf conf; - protected final StorageManager sm; - public PhysicalPlannerImpl(final TajoConf conf, final StorageManager sm) { + public PhysicalPlannerImpl(final TajoConf conf) { this.conf = conf; - this.sm = sm; } public PhysicalExec createPlan(final TaskAttemptContext context, final LogicalNode logicalPlan) @@ -250,11 +246,10 @@ public class PhysicalPlannerImpl implements PhysicalPlanner { public long estimateSizeRecursive(TaskAttemptContext ctx, String [] tableIds) throws IOException { long size = 0; for (String tableId : tableIds) { - // TODO - CSV is a hack. - List<FileFragment> fragments = FragmentConvertor.convert(ctx.getConf(), CatalogProtos.StoreType.CSV, - ctx.getTables(tableId)); - for (FileFragment frag : fragments) { - size += frag.getEndKey(); + FragmentProto[] fragmentProtos = ctx.getTables(tableId); + List<Fragment> fragments = FragmentConvertor.convert(ctx.getConf(), fragmentProtos); + for (Fragment frag : fragments) { + size += StorageManager.getFragmentLength(ctx.getConf(), frag); } } return size; @@ -446,13 +441,13 @@ public class PhysicalPlannerImpl implements PhysicalPlanner { leftSortNode.setSortSpecs(sortSpecs[0]); leftSortNode.setInSchema(leftExec.getSchema()); leftSortNode.setOutSchema(leftExec.getSchema()); - ExternalSortExec outerSort = new ExternalSortExec(context, sm, leftSortNode, leftExec); + ExternalSortExec outerSort = new ExternalSortExec(context, leftSortNode, leftExec); SortNode rightSortNode = LogicalPlan.createNodeWithoutPID(SortNode.class); rightSortNode.setSortSpecs(sortSpecs[1]); rightSortNode.setInSchema(rightExec.getSchema()); rightSortNode.setOutSchema(rightExec.getSchema()); - ExternalSortExec innerSort = new ExternalSortExec(context, sm, rightSortNode, rightExec); + ExternalSortExec innerSort = new ExternalSortExec(context, rightSortNode, rightExec); LOG.info("Join (" + plan.getPID() +") chooses [Merge Join]"); return new MergeJoinExec(context, plan, outerSort, innerSort, sortSpecs[0], sortSpecs[1]); @@ -543,13 +538,13 @@ public class PhysicalPlannerImpl implements PhysicalPlanner { leftSortNode2.setSortSpecs(sortSpecs2[0]); leftSortNode2.setInSchema(leftExec.getSchema()); leftSortNode2.setOutSchema(leftExec.getSchema()); - ExternalSortExec outerSort2 = new ExternalSortExec(context, sm, leftSortNode2, leftExec); + ExternalSortExec outerSort2 = new ExternalSortExec(context, leftSortNode2, leftExec); SortNode rightSortNode2 = LogicalPlan.createNodeWithoutPID(SortNode.class); rightSortNode2.setSortSpecs(sortSpecs2[1]); rightSortNode2.setInSchema(rightExec.getSchema()); rightSortNode2.setOutSchema(rightExec.getSchema()); - ExternalSortExec innerSort2 = new ExternalSortExec(context, sm, rightSortNode2, rightExec); + ExternalSortExec innerSort2 = new ExternalSortExec(context, rightSortNode2, rightExec); return new RightOuterMergeJoinExec(context, plan, outerSort2, innerSort2, sortSpecs2[0], sortSpecs2[1]); } @@ -634,13 +629,13 @@ public class PhysicalPlannerImpl implements PhysicalPlanner { leftSortNode.setSortSpecs(sortSpecs3[0]); leftSortNode.setInSchema(leftExec.getSchema()); leftSortNode.setOutSchema(leftExec.getSchema()); - ExternalSortExec outerSort3 = new ExternalSortExec(context, sm, leftSortNode, leftExec); + ExternalSortExec outerSort3 = new ExternalSortExec(context, leftSortNode, leftExec); SortNode rightSortNode = LogicalPlan.createNodeWithoutPID(SortNode.class); rightSortNode.setSortSpecs(sortSpecs3[1]); rightSortNode.setInSchema(rightExec.getSchema()); rightSortNode.setOutSchema(rightExec.getSchema()); - ExternalSortExec innerSort3 = new ExternalSortExec(context, sm, rightSortNode, rightExec); + ExternalSortExec innerSort3 = new ExternalSortExec(context, rightSortNode, rightExec); return new MergeFullOuterJoinExec(context, plan, outerSort3, innerSort3, sortSpecs3[0], sortSpecs3[1]); } @@ -768,7 +763,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner { switch (plan.getShuffleType()) { case HASH_SHUFFLE: case SCATTERED_HASH_SHUFFLE: - return new HashShuffleFileWriteExec(ctx, sm, plan, subOp); + return new HashShuffleFileWriteExec(ctx, plan, subOp); case RANGE_SHUFFLE: SortExec sortExec = PhysicalPlanUtil.findExecutor(subOp, SortExec.class); @@ -783,7 +778,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner { specs[i] = new SortSpec(columns[i]); } } - return new RangeShuffleFileWriteExec(ctx, sm, subOp, plan.getInSchema(), plan.getInSchema(), sortSpecs); + return new RangeShuffleFileWriteExec(ctx, subOp, plan.getInSchema(), plan.getInSchema(), sortSpecs); case NONE_SHUFFLE: // if there is no given NULL CHAR property in the table property and the query is neither CTAS or INSERT, @@ -869,7 +864,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner { sortNode.setInSchema(child.getSchema()); sortNode.setOutSchema(child.getSchema()); - ExternalSortExec sortExec = new ExternalSortExec(context, sm, sortNode, child); + ExternalSortExec sortExec = new ExternalSortExec(context, sortNode, child); LOG.info("The planner chooses [Sort-based Column Partitioned Store] algorithm"); return new SortBasedColPartitionStoreExec(context, storeTableNode, sortExec); } @@ -896,10 +891,10 @@ public class PhysicalPlannerImpl implements PhysicalPlanner { // Since the default intermediate file format is raw file, it is not problem right now. if (checkIfSortEquivalance(ctx, scanNode, node)) { if (ctx.getTable(scanNode.getCanonicalName()) == null) { - return new SeqScanExec(ctx, sm, scanNode, null); + return new SeqScanExec(ctx, scanNode, null); } FragmentProto [] fragments = ctx.getTables(scanNode.getCanonicalName()); - return new ExternalSortExec(ctx, sm, (SortNode) node.peek(), fragments); + return new ExternalSortExec(ctx, (SortNode) node.peek(), fragments); } else { Enforcer enforcer = ctx.getEnforcer(); @@ -919,25 +914,26 @@ public class PhysicalPlannerImpl implements PhysicalPlanner { if (scanNode instanceof PartitionedTableScanNode) { if (broadcastFlag) { PartitionedTableScanNode partitionedTableScanNode = (PartitionedTableScanNode) scanNode; - List<FileFragment> fileFragments = TUtil.newList(); + List<Fragment> fileFragments = TUtil.newList(); + FileStorageManager fileStorageManager = (FileStorageManager)StorageManager.getFileStorageManager(ctx.getConf()); for (Path path : partitionedTableScanNode.getInputPaths()) { - fileFragments.addAll(TUtil.newList(sm.split(scanNode.getCanonicalName(), path))); + fileFragments.addAll(TUtil.newList(fileStorageManager.split(scanNode.getCanonicalName(), path))); } FragmentProto[] fragments = FragmentConvertor.toFragmentProtoArray(fileFragments.toArray(new FileFragment[fileFragments.size()])); ctx.addFragments(scanNode.getCanonicalName(), fragments); - return new PartitionMergeScanExec(ctx, sm, scanNode, fragments); + return new PartitionMergeScanExec(ctx, scanNode, fragments); } } } if (ctx.getTable(scanNode.getCanonicalName()) == null) { - return new SeqScanExec(ctx, sm, scanNode, null); + return new SeqScanExec(ctx, scanNode, null); } FragmentProto [] fragments = ctx.getTables(scanNode.getCanonicalName()); - return new SeqScanExec(ctx, sm, scanNode, fragments); + return new SeqScanExec(ctx, scanNode, fragments); } } @@ -997,7 +993,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner { sortNode.setSortSpecs(sortSpecs); sortNode.setInSchema(subOp.getSchema()); sortNode.setOutSchema(subOp.getSchema()); - ExternalSortExec sortExec = new ExternalSortExec(ctx, sm, sortNode, subOp); + ExternalSortExec sortExec = new ExternalSortExec(ctx, sortNode, subOp); LOG.info("The planner chooses [Sort Aggregation] in (" + TUtil.arrayToString(sortSpecs) + ")"); return new SortAggregateExec(ctx, groupbyNode, sortExec); } @@ -1038,7 +1034,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner { sortNode.setSortSpecs(sortSpecs); sortNode.setInSchema(subOp.getSchema()); sortNode.setOutSchema(subOp.getSchema()); - child = new ExternalSortExec(context, sm, sortNode, subOp); + child = new ExternalSortExec(context, sortNode, subOp); LOG.info("The planner chooses [Sort Aggregation] in (" + TUtil.arrayToString(sortSpecs) + ")"); } @@ -1101,7 +1097,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner { sortNode.setSortSpecs(sortSpecs.toArray(new SortSpec[]{})); sortNode.setInSchema(distinctNode.getInSchema()); sortNode.setOutSchema(distinctNode.getInSchema()); - ExternalSortExec sortExec = new ExternalSortExec(context, sm, sortNode, subOp); + ExternalSortExec sortExec = new ExternalSortExec(context, sortNode, subOp); return sortExec; } @@ -1132,7 +1128,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner { sortNode.setSortSpecs(sortSpecs); sortNode.setInSchema(subOp.getSchema()); sortNode.setOutSchema(eachGroupbyNode.getInSchema()); - ExternalSortExec sortExec = new ExternalSortExec(ctx, sm, sortNode, subOp); + ExternalSortExec sortExec = new ExternalSortExec(ctx, sortNode, subOp); sortAggregateExec[index++] = new SortAggregateExec(ctx, eachGroupbyNode, sortExec); } @@ -1160,7 +1156,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner { if (algorithm == SortEnforce.SortAlgorithm.IN_MEMORY_SORT) { return new MemSortExec(context, sortNode, child); } else { - return new ExternalSortExec(context, sm, sortNode, child); + return new ExternalSortExec(context, sortNode, child); } } @@ -1169,7 +1165,7 @@ public class PhysicalPlannerImpl implements PhysicalPlanner { public SortExec createBestSortPlan(TaskAttemptContext context, SortNode sortNode, PhysicalExec child) throws IOException { - return new ExternalSortExec(context, sm, sortNode, child); + return new ExternalSortExec(context, sortNode, child); } public PhysicalExec createIndexScanExec(TaskAttemptContext ctx, @@ -1181,14 +1177,15 @@ public class PhysicalPlannerImpl implements PhysicalPlanner { FragmentProto [] fragmentProtos = ctx.getTables(annotation.getTableName()); List<FileFragment> fragments = - FragmentConvertor.convert(ctx.getConf(), ctx.getDataChannel().getStoreType(), fragmentProtos); + FragmentConvertor.convert(ctx.getConf(), fragmentProtos); String indexName = IndexUtil.getIndexNameOfFrag(fragments.get(0), annotation.getSortKeys()); + FileStorageManager sm = (FileStorageManager)StorageManager.getFileStorageManager(ctx.getConf()); Path indexPath = new Path(sm.getTablePath(annotation.getTableName()), "index"); TupleComparator comp = new BaseTupleComparator(annotation.getKeySchema(), annotation.getSortKeys()); - return new BSTIndexScanExec(ctx, sm, annotation, fragments.get(0), new Path(indexPath, indexName), + return new BSTIndexScanExec(ctx, annotation, fragments.get(0), new Path(indexPath, indexName), annotation.getKeySchema(), comp, annotation.getDatum()); } http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java index 77eb32d..aecb364 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/global/ExecutionBlock.java @@ -82,6 +82,8 @@ public class ExecutionBlock { } else if (node instanceof TableSubQueryNode) { TableSubQueryNode subQuery = (TableSubQueryNode) node; s.add(s.size(), subQuery.getSubQuery()); + } else if (node instanceof StoreTableNode) { + store = (StoreTableNode)node; } } } http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java index f699607..6adc523 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/BSTIndexScanExec.java @@ -18,6 +18,7 @@ package org.apache.tajo.engine.planner.physical; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.tajo.catalog.Schema; @@ -47,8 +48,7 @@ public class BSTIndexScanExec extends PhysicalExec { private float progress; - public BSTIndexScanExec(TaskAttemptContext context, - StorageManager sm , ScanNode scanNode , + public BSTIndexScanExec(TaskAttemptContext context, ScanNode scanNode , FileFragment fragment, Path fileName , Schema keySchema, TupleComparator comparator , Datum[] datum) throws IOException { super(context, scanNode.getInSchema(), scanNode.getOutSchema()); @@ -61,7 +61,8 @@ public class BSTIndexScanExec extends PhysicalExec { this.fileScanner.init(); this.projector = new Projector(context, inSchema, outSchema, scanNode.getTargets()); - this.reader = new BSTIndex(sm.getFileSystem().getConf()). + FileSystem fs = fileName.getFileSystem(context.getConf()); + this.reader = new BSTIndex(fs.getConf()). getIndexReader(fileName, keySchema, comparator); this.reader.open(); } http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java index ec239de..8ee4e2f 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java @@ -35,6 +35,7 @@ import org.apache.tajo.plan.logical.InsertNode; import org.apache.tajo.plan.logical.NodeType; import org.apache.tajo.plan.logical.StoreTableNode; import org.apache.tajo.storage.Appender; +import org.apache.tajo.storage.FileStorageManager; import org.apache.tajo.storage.StorageManager; import org.apache.tajo.storage.StorageUtil; import org.apache.tajo.unit.StorageUnit; @@ -120,6 +121,7 @@ public abstract class ColPartitionStoreExec extends UnaryPhysicalExec { super.init(); storeTablePath = context.getOutputPath(); + FileSystem fs = storeTablePath.getFileSystem(context.getConf()); if (!fs.exists(storeTablePath.getParent())) { fs.mkdirs(storeTablePath.getParent()); @@ -160,7 +162,8 @@ public abstract class ColPartitionStoreExec extends UnaryPhysicalExec { actualFilePath = new Path(lastFileName + "_" + suffixId); } - appender = StorageManager.getStorageManager(context.getConf()).getAppender(meta, outSchema, actualFilePath); + appender = ((FileStorageManager)StorageManager.getFileStorageManager(context.getConf())) + .getAppender(meta, outSchema, actualFilePath); appender.enableStats(); appender.init(); http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java index 121e6bd..4e19114 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ExternalSortExec.java @@ -106,7 +106,7 @@ public class ExternalSortExec extends SortExec { /** total bytes of input data */ private long sortAndStoredBytes; - private ExternalSortExec(final TaskAttemptContext context, final StorageManager sm, final SortNode plan) + private ExternalSortExec(final TaskAttemptContext context, final SortNode plan) throws PhysicalPlanningException { super(context, plan.getInSchema(), plan.getOutSchema(), null, plan.getSortKeys()); @@ -128,10 +128,9 @@ public class ExternalSortExec extends SortExec { localFS = new RawLocalFileSystem(); } - public ExternalSortExec(final TaskAttemptContext context, - final StorageManager sm, final SortNode plan, + public ExternalSortExec(final TaskAttemptContext context,final SortNode plan, final CatalogProtos.FragmentProto[] fragments) throws PhysicalPlanningException { - this(context, sm, plan); + this(context, plan); mergedInputFragments = TUtil.newList(); for (CatalogProtos.FragmentProto proto : fragments) { @@ -140,10 +139,9 @@ public class ExternalSortExec extends SortExec { } } - public ExternalSortExec(final TaskAttemptContext context, - final StorageManager sm, final SortNode plan, final PhysicalExec child) + public ExternalSortExec(final TaskAttemptContext context, final SortNode plan, final PhysicalExec child) throws IOException { - this(context, sm, plan); + this(context, plan); setChild(child); } @@ -175,7 +173,7 @@ public class ExternalSortExec extends SortExec { long chunkWriteStart = System.currentTimeMillis(); Path outputPath = getChunkPathForWrite(0, chunkId); - final RawFileAppender appender = new RawFileAppender(context.getConf(), inSchema, meta, outputPath); + final RawFileAppender appender = new RawFileAppender(context.getConf(), null, inSchema, meta, outputPath); appender.init(); for (Tuple t : tupleBlock) { appender.addTuple(t); @@ -473,7 +471,7 @@ public class ExternalSortExec extends SortExec { final Path outputPath = getChunkPathForWrite(level + 1, nextRunId); info(LOG, mergeFanout + " files are being merged to an output file " + outputPath.getName()); long mergeStartTime = System.currentTimeMillis(); - final RawFileAppender output = new RawFileAppender(context.getConf(), inSchema, meta, outputPath); + final RawFileAppender output = new RawFileAppender(context.getConf(), null, inSchema, meta, outputPath); output.init(); final Scanner merger = createKWayMerger(inputFiles, startIdx, mergeFanout); merger.init(); @@ -857,7 +855,7 @@ public class ExternalSortExec extends SortExec { if (finalOutputFiles != null) { for (FileFragment frag : finalOutputFiles) { File tmpFile = new File(localFS.makeQualified(frag.getPath()).toUri()); - if (frag.getStartKey() == 0 && frag.getEndKey() == tmpFile.length()) { + if (frag.getStartKey() == 0 && frag.getLength() == tmpFile.length()) { localFS.delete(frag.getPath(), true); LOG.info("Delete file: " + frag); } http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java index 5bf80fd..d051fb6 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/HashShuffleFileWriteExec.java @@ -55,7 +55,7 @@ public final class HashShuffleFileWriteExec extends UnaryPhysicalExec { private HashShuffleAppenderManager hashShuffleAppenderManager; private int numHashShuffleBufferTuples; - public HashShuffleFileWriteExec(TaskAttemptContext context, final StorageManager sm, + public HashShuffleFileWriteExec(TaskAttemptContext context, final ShuffleFileWriteNode plan, final PhysicalExec child) throws IOException { super(context, plan.getInSchema(), plan.getOutSchema(), child); Preconditions.checkArgument(plan.hasShuffleKeys()); http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java index 0569c1b..5297e2c 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PartitionMergeScanExec.java @@ -44,25 +44,22 @@ public class PartitionMergeScanExec extends PhysicalExec { private List<SeqScanExec> scanners = Lists.newArrayList(); private Iterator<SeqScanExec> iterator; - private StorageManager sm; - private float progress; protected TableStats inputStats; - public PartitionMergeScanExec(TaskAttemptContext context, StorageManager sm, + public PartitionMergeScanExec(TaskAttemptContext context, ScanNode plan, CatalogProtos.FragmentProto[] fragments) throws IOException { super(context, plan.getInSchema(), plan.getOutSchema()); this.plan = plan; this.fragments = fragments; - this.sm = sm; inputStats = new TableStats(); } public void init() throws IOException { for (CatalogProtos.FragmentProto fragment : fragments) { - SeqScanExec scanExec = new SeqScanExec(context, sm, (ScanNode) PlannerUtil.clone(null, plan), + SeqScanExec scanExec = new SeqScanExec(context, (ScanNode) PlannerUtil.clone(null, plan), new CatalogProtos.FragmentProto[] {fragment}); scanners.add(scanExec); } http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java index a63b838..247b373 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/PhysicalPlanUtil.java @@ -34,10 +34,7 @@ import org.apache.tajo.plan.expr.EvalNode; import org.apache.tajo.plan.logical.NodeType; import org.apache.tajo.plan.logical.PersistentStoreNode; import org.apache.tajo.engine.query.QueryContext; -import org.apache.tajo.storage.BaseTupleComparator; -import org.apache.tajo.storage.StorageConstants; -import org.apache.tajo.storage.StorageManager; -import org.apache.tajo.storage.TupleComparator; +import org.apache.tajo.storage.*; import org.apache.tajo.storage.fragment.FileFragment; import org.apache.tajo.storage.fragment.FragmentConvertor; @@ -141,9 +138,7 @@ public class PhysicalPlanUtil { int currentDepth, int maxDepth) throws IOException { // Intermediate directory if (fs.isDirectory(path)) { - - FileStatus[] files = fs.listStatus(path, StorageManager.hiddenFileFilter); - + FileStatus[] files = fs.listStatus(path, FileStorageManager.hiddenFileFilter); if (files != null && files.length > 0) { for (FileStatus eachFile : files) { http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java index 79993da..119f053 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java @@ -51,7 +51,7 @@ public class RangeShuffleFileWriteExec extends UnaryPhysicalExec { private FileAppender appender; private TableMeta meta; - public RangeShuffleFileWriteExec(final TaskAttemptContext context, final StorageManager sm, + public RangeShuffleFileWriteExec(final TaskAttemptContext context, final PhysicalExec child, final Schema inSchema, final Schema outSchema, final SortSpec[] sortSpecs) throws IOException { super(context, inSchema, outSchema, child); @@ -78,8 +78,8 @@ public class RangeShuffleFileWriteExec extends UnaryPhysicalExec { context.getDataChannel().getStoreType() : CatalogProtos.StoreType.RAW); FileSystem fs = new RawLocalFileSystem(); fs.mkdirs(storeTablePath); - this.appender = (FileAppender) StorageManager.getStorageManager(context.getConf()).getAppender(meta, - outSchema, new Path(storeTablePath, "output")); + this.appender = (FileAppender) ((FileStorageManager)StorageManager.getFileStorageManager(context.getConf())) + .getAppender(meta, outSchema, new Path(storeTablePath, "output")); this.appender.enableStats(); this.appender.init(); this.indexWriter = bst.getIndexWriter(new Path(storeTablePath, "index"), http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java index 1ed25d8..f507988 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java @@ -42,6 +42,7 @@ import org.apache.tajo.plan.rewrite.rules.PartitionedTableRewriter; import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.storage.*; import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.storage.fragment.Fragment; import org.apache.tajo.storage.fragment.FragmentConvertor; import org.apache.tajo.worker.TaskAttemptContext; @@ -69,7 +70,7 @@ public class SeqScanExec extends PhysicalExec { private boolean cacheRead = false; - public SeqScanExec(TaskAttemptContext context, StorageManager sm, ScanNode plan, + public SeqScanExec(TaskAttemptContext context, ScanNode plan, CatalogProtos.FragmentProto [] fragments) throws IOException { super(context, plan.getInSchema(), plan.getOutSchema()); @@ -81,9 +82,8 @@ public class SeqScanExec extends PhysicalExec { String pathNameKey = ""; if (fragments != null) { for (FragmentProto f : fragments) { - FileFragment fileFragement = FragmentConvertor.convert( - context.getConf(), plan.getTableDesc().getMeta().getStoreType(), f); - pathNameKey += fileFragement.getPath(); + Fragment fragement = FragmentConvertor.convert(context.getConf(), f); + pathNameKey += fragement.getKey(); } } @@ -227,13 +227,13 @@ public class SeqScanExec extends PhysicalExec { if (fragments != null) { if (fragments.length > 1) { this.scanner = new MergeScanner(context.getConf(), plan.getPhysicalSchema(), meta, - FragmentConvertor.<FileFragment>convert(context.getConf(), plan.getTableDesc().getMeta().getStoreType(), - fragments), projected + FragmentConvertor.convert(context.getConf(), fragments), projected ); } else { - this.scanner = StorageManager.getStorageManager( - context.getConf()).getScanner(meta, plan.getPhysicalSchema(), fragments[0], - projected); + StorageManager storageManager = StorageManager.getStorageManager( + context.getConf(), plan.getTableDesc().getMeta().getStoreType()); + this.scanner = storageManager.getScanner(meta, + plan.getPhysicalSchema(), fragments[0], projected); } scanner.init(); } http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java index 0dc172c..3d3da5c 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/StoreTableExec.java @@ -23,12 +23,15 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; import org.apache.tajo.SessionVars; import org.apache.tajo.catalog.CatalogUtil; +import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.catalog.statistics.StatisticsUtil; import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.plan.logical.InsertNode; import org.apache.tajo.plan.logical.PersistentStoreNode; +import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.storage.Appender; +import org.apache.tajo.storage.FileStorageManager; import org.apache.tajo.storage.StorageManager; import org.apache.tajo.storage.Tuple; import org.apache.tajo.unit.StorageUnit; @@ -78,30 +81,33 @@ public class StoreTableExec extends UnaryPhysicalExec { } public void openNewFile(int suffixId) throws IOException { - String prevFile = null; + Schema appenderSchema = (plan instanceof InsertNode) ? ((InsertNode) plan).getTableSchema() : outSchema; - lastFileName = context.getOutputPath(); - if (suffixId > 0) { - prevFile = lastFileName.toString(); + if (PlannerUtil.isFileStorageType(meta.getStoreType())) { + String prevFile = null; - lastFileName = new Path(lastFileName + "_" + suffixId); - } + lastFileName = context.getOutputPath(); + + if (suffixId > 0) { + prevFile = lastFileName.toString(); + lastFileName = new Path(lastFileName + "_" + suffixId); + } + + appender = ((FileStorageManager)StorageManager.getFileStorageManager(context.getConf())) + .getAppender(meta, appenderSchema, lastFileName); - if (plan instanceof InsertNode) { - InsertNode createTableNode = (InsertNode) plan; - appender = StorageManager.getStorageManager(context.getConf()).getAppender(meta, - createTableNode.getTableSchema(), context.getOutputPath()); + if (suffixId > 0) { + LOG.info(prevFile + " exceeds " + SessionVars.MAX_OUTPUT_FILE_SIZE.keyname() + " (" + maxPerFileSize + " MB), " + + "The remain output will be written into " + lastFileName.toString()); + } } else { - appender = StorageManager.getStorageManager(context.getConf()).getAppender(meta, outSchema, lastFileName); + appender = StorageManager.getStorageManager(context.getConf(), meta.getStoreType()).getAppender( + context.getQueryContext(), + context.getTaskId(), meta, appenderSchema, context.getQueryContext().getStagingDir()); } appender.enableStats(); appender.init(); - - if (suffixId > 0) { - LOG.info(prevFile + " exceeds " + SessionVars.MAX_OUTPUT_FILE_SIZE.keyname() + " (" + maxPerFileSize + " MB), " + - "The remain output will be written into " + lastFileName.toString()); - } } /* (non-Javadoc) http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java b/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java index 47ead40..493ca6e 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/query/QueryContext.java @@ -21,11 +21,11 @@ package org.apache.tajo.engine.query; import org.apache.hadoop.fs.Path; import org.apache.tajo.ConfigKey; import org.apache.tajo.OverridableConf; +import org.apache.tajo.QueryVars; import org.apache.tajo.SessionVars; import org.apache.tajo.catalog.partition.PartitionMethodDesc; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.master.session.Session; -import org.apache.tajo.validation.Validator; import org.apache.tajo.plan.logical.NodeType; import static org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.KeyValueSetProto; @@ -34,41 +34,6 @@ import static org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.KeyValueSetPro * QueryContent is a overridable config, and it provides a set of various configs for a query instance. */ public class QueryContext extends OverridableConf { - public static enum QueryVars implements ConfigKey { - COMMAND_TYPE, - STAGING_DIR, - OUTPUT_TABLE_NAME, - OUTPUT_TABLE_PATH, - OUTPUT_PARTITIONS, - OUTPUT_OVERWRITE, - OUTPUT_AS_DIRECTORY, - OUTPUT_PER_FILE_SIZE, - ; - - QueryVars() { - } - - @Override - public String keyname() { - return name().toLowerCase(); - } - - @Override - public ConfigType type() { - return ConfigType.QUERY; - } - - @Override - public Class<?> valueClass() { - return null; - } - - @Override - public Validator validator() { - return null; - } - } - public QueryContext(TajoConf conf) { super(conf, ConfigKey.ConfigType.QUERY); } @@ -103,8 +68,8 @@ public class QueryContext extends OverridableConf { } public Path getStagingDir() { - String strVal = get(QueryVars.STAGING_DIR); - return strVal != null ? new Path(strVal) : null; + String strVal = get(QueryVars.STAGING_DIR, ""); + return strVal != null && !strVal.isEmpty() ? new Path(strVal) : null; } /** @@ -127,7 +92,9 @@ public class QueryContext extends OverridableConf { } public void setOutputPath(Path path) { - put(QueryVars.OUTPUT_TABLE_PATH, path.toUri().toString()); + if (path != null) { + put(QueryVars.OUTPUT_TABLE_PATH, path.toUri().toString()); + } } public Path getOutputPath() { http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java ---------------------------------------------------------------------- diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java b/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java index aeb4e05..3bb1b5b 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/utils/TupleUtil.java @@ -125,12 +125,13 @@ public class TupleUtil { Tuple startTuple = new VTuple(target.size()); Tuple endTuple = new VTuple(target.size()); int i = 0; + int sortSpecIndex = 0; // In outer join, empty table could be searched. // As a result, min value and max value would be null. // So, we should put NullDatum for this case. for (Column col : target.getColumns()) { - if (sortSpecs[i].isAscending()) { + if (sortSpecs[sortSpecIndex].isAscending()) { if (statSet.get(col).getMinValue() != null) startTuple.put(i, statSet.get(col).getMinValue()); else @@ -164,6 +165,10 @@ public class TupleUtil { else endTuple.put(i, DatumFactory.createNullDatum()); } + if (target.getColumns().size() == sortSpecs.length) { + // Not composite column sort + sortSpecIndex++; + } i++; } return new TupleRange(sortSpecs, startTuple, endTuple);
