http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTextSerializerDeserializer.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTextSerializerDeserializer.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTextSerializerDeserializer.java new file mode 100644 index 0000000..a0ad492 --- /dev/null +++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HBaseTextSerializerDeserializer.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.hbase; + +import org.apache.tajo.catalog.Column; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.DatumFactory; +import org.apache.tajo.datum.NullDatum; +import org.apache.tajo.util.NumberUtil; + +import java.io.IOException; + +public class HBaseTextSerializerDeserializer { + public static Datum deserialize(Column col, byte[] bytes) throws IOException { + Datum datum; + switch (col.getDataType().getType()) { + case INT1: + case INT2: + datum = bytes == null || bytes.length == 0 ? NullDatum.get() : + DatumFactory.createInt2((short)NumberUtil.parseInt(bytes, 0, bytes.length)); + break; + case INT4: + datum = bytes == null || bytes.length == 0 ? NullDatum.get() : + DatumFactory.createInt4(NumberUtil.parseInt(bytes, 0, bytes.length)); + break; + case INT8: + datum = bytes == null || bytes.length == 0 ? NullDatum.get() : + DatumFactory.createInt8(new String(bytes, 0, bytes.length)); + break; + case FLOAT4: + datum = bytes == null || bytes.length == 0 ? NullDatum.get() : + DatumFactory.createFloat4(new String(bytes, 0, bytes.length)); + break; + case FLOAT8: + datum = bytes == null || bytes.length == 0 ? NullDatum.get() : + DatumFactory.createFloat8(NumberUtil.parseDouble(bytes, 0, bytes.length)); + break; + case TEXT: + datum = bytes == null ? NullDatum.get() : DatumFactory.createText(bytes); + break; + default: + datum = NullDatum.get(); + break; + } + return datum; + } + + public static byte[] serialize(Column col, Datum datum) throws IOException { + if (datum == null || datum instanceof NullDatum) { + return null; + } + + return datum.asChars().getBytes(); + } +}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HFileAppender.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HFileAppender.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HFileAppender.java new file mode 100644 index 0000000..07f7988 --- /dev/null +++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/HFileAppender.java @@ -0,0 +1,167 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.hbase; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; +import org.apache.tajo.ExecutionBlockId; +import org.apache.tajo.QueryUnitAttemptId; +import org.apache.tajo.TajoConstants; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.storage.Tuple; +import org.apache.tajo.util.Bytes; + +import java.io.IOException; +import java.util.TreeSet; + +public class HFileAppender extends AbstractHBaseAppender { + private static final Log LOG = LogFactory.getLog(HFileAppender.class); + + private RecordWriter<ImmutableBytesWritable, Cell> writer; + private TaskAttemptContext writerContext; + private Path workingFilePath; + private FileOutputCommitter committer; + + public HFileAppender(Configuration conf, QueryUnitAttemptId taskAttemptId, + Schema schema, TableMeta meta, Path stagingDir) { + super(conf, taskAttemptId, schema, meta, stagingDir); + } + + @Override + public void init() throws IOException { + super.init(); + + Configuration taskConf = new Configuration(); + Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME); + taskConf.set(FileOutputFormat.OUTDIR, stagingResultDir.toString()); + + ExecutionBlockId ebId = taskAttemptId.getQueryUnitId().getExecutionBlockId(); + writerContext = new TaskAttemptContextImpl(taskConf, + new TaskAttemptID(ebId.getQueryId().toString(), ebId.getId(), TaskType.MAP, + taskAttemptId.getQueryUnitId().getId(), taskAttemptId.getId())); + + HFileOutputFormat2 hFileOutputFormat2 = new HFileOutputFormat2(); + try { + writer = hFileOutputFormat2.getRecordWriter(writerContext); + + committer = new FileOutputCommitter(FileOutputFormat.getOutputPath(writerContext), writerContext); + workingFilePath = committer.getWorkPath(); + } catch (InterruptedException e) { + throw new IOException(e.getMessage(), e); + } + + LOG.info("Created hbase file writer: " + workingFilePath); + } + + long totalNumBytes = 0; + ImmutableBytesWritable keyWritable = new ImmutableBytesWritable(); + boolean first = true; + TreeSet<KeyValue> kvSet = new TreeSet<KeyValue>(KeyValue.COMPARATOR); + + + @Override + public void addTuple(Tuple tuple) throws IOException { + Datum datum; + + byte[] rowkey = getRowKeyBytes(tuple); + + if (!first && !Bytes.equals(keyWritable.get(), 0, keyWritable.getLength(), rowkey, 0, rowkey.length)) { + try { + for (KeyValue kv : kvSet) { + writer.write(keyWritable, kv); + totalNumBytes += keyWritable.getLength() + kv.getLength(); + } + kvSet.clear(); + // Statistical section + if (enabledStats) { + stats.incrementRow(); + } + } catch (InterruptedException e) { + LOG.error(e.getMessage(), e); + } + } + + first = false; + + keyWritable.set(rowkey); + + readKeyValues(tuple, rowkey); + if (keyValues != null) { + for (KeyValue eachKeyVal: keyValues) { + kvSet.add(eachKeyVal); + } + } + } + + @Override + public void flush() throws IOException { + } + + @Override + public long getEstimatedOutputSize() throws IOException { + // StoreTableExec uses this value as rolling file length + // Not rolling + return 0; + } + + @Override + public void close() throws IOException { + if (!kvSet.isEmpty()) { + try { + for (KeyValue kv : kvSet) { + writer.write(keyWritable, kv); + totalNumBytes += keyWritable.getLength() + keyWritable.getLength(); + } + kvSet.clear(); + // Statistical section + if (enabledStats) { + stats.incrementRow(); + } + } catch (InterruptedException e) { + LOG.error(e.getMessage(), e); + } + } + + if (enabledStats) { + stats.setNumBytes(totalNumBytes); + } + if (writer != null) { + try { + writer.close(writerContext); + committer.commitTask(writerContext); + } catch (InterruptedException e) { + } + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/IndexPredication.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/IndexPredication.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/IndexPredication.java new file mode 100644 index 0000000..3a58e50 --- /dev/null +++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/IndexPredication.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.hbase; + +import org.apache.tajo.catalog.Column; +import org.apache.tajo.datum.Datum; + +public class IndexPredication { + private Column column; + private int columnId; + private Datum startValue; + private Datum stopValue; + + public Column getColumn() { + return column; + } + + public void setColumn(Column column) { + this.column = column; + } + + public int getColumnId() { + return columnId; + } + + public void setColumnId(int columnId) { + this.columnId = columnId; + } + + public Datum getStartValue() { + return startValue; + } + + public void setStartValue(Datum startValue) { + this.startValue = startValue; + } + + public Datum getStopValue() { + return stopValue; + } + + public void setStopValue(Datum stopValue) { + this.stopValue = stopValue; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/RowKeyMapping.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/RowKeyMapping.java b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/RowKeyMapping.java new file mode 100644 index 0000000..4577703 --- /dev/null +++ b/tajo-storage/tajo-storage-hbase/src/main/java/org/apache/tajo/storage/hbase/RowKeyMapping.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.hbase; + +public class RowKeyMapping { + private boolean isBinary; + private int keyFieldIndex; + + public boolean isBinary() { + return isBinary; + } + + public void setBinary(boolean isBinary) { + this.isBinary = isBinary; + } + + public int getKeyFieldIndex() { + return keyFieldIndex; + } + + public void setKeyFieldIndex(int keyFieldIndex) { + this.keyFieldIndex = keyFieldIndex; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hbase/src/main/proto/StorageFragmentProtos.proto ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hbase/src/main/proto/StorageFragmentProtos.proto b/tajo-storage/tajo-storage-hbase/src/main/proto/StorageFragmentProtos.proto new file mode 100644 index 0000000..668b116 --- /dev/null +++ b/tajo-storage/tajo-storage-hbase/src/main/proto/StorageFragmentProtos.proto @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +option java_package = "org.apache.tajo.storage.hbase"; +option java_outer_classname = "StorageFragmentProtos"; +option optimize_for = SPEED; +option java_generic_services = false; +option java_generate_equals_and_hash = true; + +import "CatalogProtos.proto"; + +message HBaseFragmentProto { + required string tableName = 1; + required string hbaseTableName = 2; + required bytes startRow = 3; + required bytes stopRow = 4; + required bool last = 5; + required int64 length = 6; + optional string regionLocation = 7; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestColumnMapping.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestColumnMapping.java b/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestColumnMapping.java new file mode 100644 index 0000000..68939d6 --- /dev/null +++ b/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestColumnMapping.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.hbase; + +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; +import org.apache.tajo.common.TajoDataTypes.Type; +import org.apache.tajo.util.KeyValueSet; +import org.junit.Test; + +import java.util.List; + +import static org.junit.Assert.*; + +public class TestColumnMapping { + @Test + public void testColumnKeyValueMapping() throws Exception { + KeyValueSet keyValueSet = new KeyValueSet(); + keyValueSet.set(HBaseStorageConstants.META_TABLE_KEY, "test"); + keyValueSet.set(HBaseStorageConstants.META_COLUMNS_KEY, ":key,col2:key:,col2:value:#b,col3:"); + + Schema schema = new Schema(); + schema.addColumn("c1", Type.TEXT); + schema.addColumn("c2", Type.TEXT); + schema.addColumn("c3", Type.TEXT); + schema.addColumn("c4", Type.TEXT); + + TableMeta tableMeta = new TableMeta(StoreType.HBASE, keyValueSet); + + ColumnMapping columnMapping = new ColumnMapping(schema, tableMeta); + + List<String> cfNames = columnMapping.getColumnFamilyNames(); + assertEquals(2, cfNames.size()); + assertEquals("col2", cfNames.get(0)); + assertEquals("col3", cfNames.get(1)); + + for (int i = 0; i < columnMapping.getIsBinaryColumns().length; i++) { + if (i == 2) { + assertTrue(columnMapping.getIsBinaryColumns()[i]); + } else { + assertFalse(columnMapping.getIsBinaryColumns()[i]); + } + } + + for (int i = 0; i < columnMapping.getIsRowKeyMappings().length; i++) { + if (i == 0) { + assertTrue(columnMapping.getIsRowKeyMappings()[i]); + } else { + assertFalse(columnMapping.getIsRowKeyMappings()[i]); + } + } + + String[] expectedColumnNames = { null, null, null, null}; + for (int i = 0; i < schema.size(); i++) { + String columnName = columnMapping.getMappingColumns()[i][1] == null ? null : + new String(columnMapping.getMappingColumns()[i][1]); + assertEquals(expectedColumnNames[i], columnName); + } + + for (int i = 0; i < schema.size(); i++) { + if (i == 1) { + assertTrue(columnMapping.getIsColumnKeys()[i]); + } else { + assertFalse(columnMapping.getIsColumnKeys()[i]); + } + } + + for (int i = 0; i < schema.size(); i++) { + if (i == 2) { + assertTrue(columnMapping.getIsColumnValues()[i]); + } else { + assertFalse(columnMapping.getIsColumnValues()[i]); + } + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestHBaseStorageManager.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestHBaseStorageManager.java b/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestHBaseStorageManager.java new file mode 100644 index 0000000..1fc4065 --- /dev/null +++ b/tajo-storage/tajo-storage-hbase/src/test/java/org/apache/tajo/storage/hbase/TestHBaseStorageManager.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.hbase; + +import org.apache.tajo.catalog.Column; +import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; +import org.apache.tajo.common.TajoDataTypes.Type; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.TextDatum; +import org.apache.tajo.plan.expr.*; +import org.apache.tajo.plan.logical.ScanNode; +import org.apache.tajo.storage.StorageManager; +import org.apache.tajo.util.Pair; +import org.junit.Test; + +import java.util.List; +import java.util.Set; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +public class TestHBaseStorageManager { + @Test + public void testGetIndexPredications() throws Exception { + Column rowkeyColumn = new Column("rk", Type.TEXT); + // where rk >= '020' and rk <= '055' + ScanNode scanNode = new ScanNode(1); + EvalNode evalNode1 = new BinaryEval(EvalType.GEQ, new FieldEval(rowkeyColumn), new ConstEval(new TextDatum("020"))); + EvalNode evalNode2 = new BinaryEval(EvalType.LEQ, new FieldEval(rowkeyColumn), new ConstEval(new TextDatum("055"))); + EvalNode evalNodeA = new BinaryEval(EvalType.AND, evalNode1, evalNode2); + scanNode.setQual(evalNodeA); + + HBaseStorageManager storageManager = + (HBaseStorageManager) StorageManager.getStorageManager(new TajoConf(), StoreType.HBASE); + List<Set<EvalNode>> indexEvals = storageManager.findIndexablePredicateSet(scanNode, new Column[]{rowkeyColumn}); + assertNotNull(indexEvals); + assertEquals(1, indexEvals.size()); + Pair<Datum, Datum> indexPredicateValue = storageManager.getIndexablePredicateValue(null, indexEvals.get(0)); + assertEquals("020", indexPredicateValue.getFirst().asChars()); + assertEquals("055", indexPredicateValue.getSecond().asChars()); + + // where (rk >= '020' and rk <= '055') or rk = '075' + EvalNode evalNode3 = new BinaryEval(EvalType.EQUAL, new FieldEval(rowkeyColumn),new ConstEval(new TextDatum("075"))); + EvalNode evalNodeB = new BinaryEval(EvalType.OR, evalNodeA, evalNode3); + scanNode.setQual(evalNodeB); + indexEvals = storageManager.findIndexablePredicateSet(scanNode, new Column[]{rowkeyColumn}); + assertEquals(2, indexEvals.size()); + indexPredicateValue = storageManager.getIndexablePredicateValue(null, indexEvals.get(0)); + assertEquals("020", indexPredicateValue.getFirst().asChars()); + assertEquals("055", indexPredicateValue.getSecond().asChars()); + + indexPredicateValue = storageManager.getIndexablePredicateValue(null, indexEvals.get(1)); + assertEquals("075", indexPredicateValue.getFirst().asChars()); + assertEquals("075", indexPredicateValue.getSecond().asChars()); + + // where (rk >= '020' and rk <= '055') or (rk >= '072' and rk <= '078') + EvalNode evalNode4 = new BinaryEval(EvalType.GEQ, new FieldEval(rowkeyColumn), new ConstEval(new TextDatum("072"))); + EvalNode evalNode5 = new BinaryEval(EvalType.LEQ, new FieldEval(rowkeyColumn), new ConstEval(new TextDatum("078"))); + EvalNode evalNodeC = new BinaryEval(EvalType.AND, evalNode4, evalNode5); + EvalNode evalNodeD = new BinaryEval(EvalType.OR, evalNodeA, evalNodeC); + scanNode.setQual(evalNodeD); + indexEvals = storageManager.findIndexablePredicateSet(scanNode, new Column[]{rowkeyColumn}); + assertEquals(2, indexEvals.size()); + + indexPredicateValue = storageManager.getIndexablePredicateValue(null, indexEvals.get(0)); + assertEquals("020", indexPredicateValue.getFirst().asChars()); + assertEquals("055", indexPredicateValue.getSecond().asChars()); + + indexPredicateValue = storageManager.getIndexablePredicateValue(null, indexEvals.get(1)); + assertEquals("072", indexPredicateValue.getFirst().asChars()); + assertEquals("078", indexPredicateValue.getSecond().asChars()); + + // where (rk >= '020' and rk <= '055') or (rk >= '072' and rk <= '078' and rk >= '073') + evalNode4 = new BinaryEval(EvalType.GEQ, new FieldEval(rowkeyColumn), new ConstEval(new TextDatum("072"))); + evalNode5 = new BinaryEval(EvalType.LEQ, new FieldEval(rowkeyColumn), new ConstEval(new TextDatum("078"))); + evalNodeC = new BinaryEval(EvalType.AND, evalNode4, evalNode5); + EvalNode evalNode6 = new BinaryEval(EvalType.GEQ, new FieldEval(rowkeyColumn), new ConstEval(new TextDatum("073"))); + evalNodeD = new BinaryEval(EvalType.AND, evalNodeC, evalNode6); + EvalNode evalNodeE = new BinaryEval(EvalType.OR, evalNodeA, evalNodeD); + scanNode.setQual(evalNodeE); + indexEvals = storageManager.findIndexablePredicateSet(scanNode, new Column[]{rowkeyColumn}); + assertEquals(2, indexEvals.size()); + + indexPredicateValue = storageManager.getIndexablePredicateValue(null, indexEvals.get(0)); + assertEquals("020", indexPredicateValue.getFirst().asChars()); + assertEquals("055", indexPredicateValue.getSecond().asChars()); + + indexPredicateValue = storageManager.getIndexablePredicateValue(null, indexEvals.get(1)); + assertEquals("073", indexPredicateValue.getFirst().asChars()); + assertEquals("078", indexPredicateValue.getSecond().asChars()); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hdfs/pom.xml ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/pom.xml b/tajo-storage/tajo-storage-hdfs/pom.xml new file mode 100644 index 0000000..5105ac5 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/pom.xml @@ -0,0 +1,380 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Copyright 2012 Database Lab., Korea Univ. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +--> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>tajo-project</artifactId> + <groupId>org.apache.tajo</groupId> + <version>0.9.1-SNAPSHOT</version> + <relativePath>../../tajo-project</relativePath> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>tajo-storage-hdfs</artifactId> + <packaging>jar</packaging> + <name>Tajo HDFS Storage</name> + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> + <parquet.version>1.5.0</parquet.version> + <parquet.format.version>2.1.0</parquet.format.version> + </properties> + + <repositories> + <repository> + <id>repository.jboss.org</id> + <url>https://repository.jboss.org/nexus/content/repositories/releases/ + </url> + <snapshots> + <enabled>false</enabled> + </snapshots> + </repository> + </repositories> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <source>1.6</source> + <target>1.6</target> + <encoding>${project.build.sourceEncoding}</encoding> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.rat</groupId> + <artifactId>apache-rat-plugin</artifactId> + <executions> + <execution> + <phase>verify</phase> + <goals> + <goal>check</goal> + </goals> + </execution> + </executions> + <configuration> + <excludes> + <exclude>src/test/resources/testVariousTypes.avsc</exclude> + </excludes> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <systemProperties> + <tajo.test>TRUE</tajo.test> + </systemProperties> + <argLine>-Xms512m -Xmx1024m -XX:MaxPermSize=128m -Dfile.encoding=UTF-8</argLine> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <version>2.4</version> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-antrun-plugin</artifactId> + <executions> + <execution> + <id>create-protobuf-generated-sources-directory</id> + <phase>initialize</phase> + <configuration> + <target> + <mkdir dir="target/generated-sources/proto" /> + </target> + </configuration> + <goals> + <goal>run</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>exec-maven-plugin</artifactId> + <version>1.2</version> + <executions> + <execution> + <id>generate-sources</id> + <phase>generate-sources</phase> + <configuration> + <executable>protoc</executable> + <arguments> + <argument>-Isrc/main/proto/</argument> + <argument>--proto_path=../../tajo-common/src/main/proto</argument> + <argument>--proto_path=../../tajo-catalog/tajo-catalog-common/src/main/proto</argument> + <argument>--java_out=target/generated-sources/proto</argument> + <argument>src/main/proto/StorageFragmentProtos.proto</argument> + </arguments> + </configuration> + <goals> + <goal>exec</goal> + </goals> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.codehaus.mojo</groupId> + <artifactId>build-helper-maven-plugin</artifactId> + <version>1.5</version> + <executions> + <execution> + <id>add-source</id> + <phase>generate-sources</phase> + <goals> + <goal>add-source</goal> + </goals> + <configuration> + <sources> + <source>target/generated-sources/proto</source> + </sources> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-report-plugin</artifactId> + </plugin> + </plugins> + </build> + + + <dependencies> + <dependency> + <groupId>org.apache.tajo</groupId> + <artifactId>tajo-common</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.tajo</groupId> + <artifactId>tajo-catalog-common</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.tajo</groupId> + <artifactId>tajo-plan</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.tajo</groupId> + <artifactId>tajo-storage-common</artifactId> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.avro</groupId> + <artifactId>trevni-core</artifactId> + <version>1.7.3</version> + </dependency> + <dependency> + <groupId>org.apache.avro</groupId> + <artifactId>trevni-avro</artifactId> + <version>1.7.3</version> + <exclusions> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-core</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <scope>provided</scope> + <exclusions> + <exclusion> + <artifactId>zookeeper</artifactId> + <groupId>org.apache.zookeeper</groupId> + </exclusion> + <exclusion> + <artifactId>slf4j-api</artifactId> + <groupId>org.slf4j</groupId> + </exclusion> + <exclusion> + <artifactId>jersey-json</artifactId> + <groupId>com.sun.jersey</groupId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + <scope>provided</scope> + <exclusions> + <exclusion> + <groupId>commons-el</groupId> + <artifactId>commons-el</artifactId> + </exclusion> + <exclusion> + <groupId>tomcat</groupId> + <artifactId>jasper-runtime</artifactId> + </exclusion> + <exclusion> + <groupId>tomcat</groupId> + <artifactId>jasper-compiler</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>jsp-2.1-jetty</artifactId> + </exclusion> + <exclusion> + <groupId>com.sun.jersey.jersey-test-framework</groupId> + <artifactId>jersey-test-framework-grizzly2</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-minicluster</artifactId> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>commons-el</groupId> + <artifactId>commons-el</artifactId> + </exclusion> + <exclusion> + <groupId>tomcat</groupId> + <artifactId>jasper-runtime</artifactId> + </exclusion> + <exclusion> + <groupId>tomcat</groupId> + <artifactId>jasper-compiler</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>jsp-2.1-jetty</artifactId> + </exclusion> + <exclusion> + <groupId>com.sun.jersey.jersey-test-framework</groupId> + <artifactId>jersey-test-framework-grizzly2</artifactId> + </exclusion> + <exclusion> + <artifactId>hadoop-yarn-server-tests</artifactId> + <groupId>org.apache.hadoop</groupId> + </exclusion> + <exclusion> + <artifactId>hadoop-mapreduce-client-jobclient</artifactId> + <groupId>org.apache.hadoop</groupId> + </exclusion> + <exclusion> + <artifactId>hadoop-mapreduce-client-app</artifactId> + <groupId>org.apache.hadoop</groupId> + </exclusion> + <exclusion> + <artifactId>hadoop-yarn-api</artifactId> + <groupId>org.apache.hadoop</groupId> + </exclusion> + <exclusion> + <artifactId>hadoop-mapreduce-client-hs</artifactId> + <groupId>org.apache.hadoop</groupId> + </exclusion> + <exclusion> + <artifactId>hadoop-mapreduce-client-core</artifactId> + <groupId>org.apache.hadoop</groupId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-core</artifactId> + <version>${hadoop.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-java</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.twitter</groupId> + <artifactId>parquet-column</artifactId> + <version>${parquet.version}</version> + </dependency> + <dependency> + <groupId>com.twitter</groupId> + <artifactId>parquet-hadoop</artifactId> + <version>${parquet.version}</version> + </dependency> + <dependency> + <groupId>com.twitter</groupId> + <artifactId>parquet-format</artifactId> + <version>${parquet.format.version}</version> + </dependency> + <dependency> + <groupId>io.netty</groupId> + <artifactId>netty-buffer</artifactId> + </dependency> + </dependencies> + + <profiles> + <profile> + <id>docs</id> + <activation> + <activeByDefault>false</activeByDefault> + </activation> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-javadoc-plugin</artifactId> + <executions> + <execution> + <!-- build javadoc jars per jar for publishing to maven --> + <id>module-javadocs</id> + <phase>package</phase> + <goals> + <goal>jar</goal> + </goals> + <configuration> + <destDir>${project.build.directory}</destDir> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + </profile> + </profiles> + + <reporting> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-report-plugin</artifactId> + <version>2.15</version> + </plugin> + </plugins> + </reporting> +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CSVFile.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CSVFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CSVFile.java new file mode 100644 index 0000000..4bf4c99 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CSVFile.java @@ -0,0 +1,587 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import org.apache.commons.lang.StringEscapeUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.compress.*; +import org.apache.tajo.QueryUnitAttemptId; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.proto.CatalogProtos; +import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.datum.Datum; +import org.apache.tajo.datum.NullDatum; +import org.apache.tajo.exception.UnsupportedException; +import org.apache.tajo.storage.compress.CodecPool; +import org.apache.tajo.storage.exception.AlreadyExistsStorageException; +import org.apache.tajo.storage.fragment.Fragment; +import org.apache.tajo.storage.rcfile.NonSyncByteArrayOutputStream; +import org.apache.tajo.util.BytesUtils; + +import java.io.*; +import java.util.ArrayList; +import java.util.Arrays; + +public class CSVFile { + + public static final byte LF = '\n'; + public static int EOF = -1; + + private static final Log LOG = LogFactory.getLog(CSVFile.class); + + public static class CSVAppender extends FileAppender { + private final TableMeta meta; + private final Schema schema; + private final int columnNum; + private final FileSystem fs; + private FSDataOutputStream fos; + private DataOutputStream outputStream; + private CompressionOutputStream deflateFilter; + private char delimiter; + private TableStatistics stats = null; + private Compressor compressor; + private CompressionCodecFactory codecFactory; + private CompressionCodec codec; + private Path compressedPath; + private byte[] nullChars; + private int BUFFER_SIZE = 128 * 1024; + private int bufferedBytes = 0; + private long pos = 0; + private boolean isShuffle; + + private NonSyncByteArrayOutputStream os = new NonSyncByteArrayOutputStream(BUFFER_SIZE); + private SerializerDeserializer serde; + + public CSVAppender(Configuration conf, final QueryUnitAttemptId taskAttemptId, + final Schema schema, final TableMeta meta, final Path workDir) throws IOException { + super(conf, taskAttemptId, schema, meta, workDir); + this.fs = workDir.getFileSystem(conf); + this.meta = meta; + this.schema = schema; + this.delimiter = StringEscapeUtils.unescapeJava( + this.meta.getOption(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER)).charAt(0); + + this.columnNum = schema.size(); + + String nullCharacters = StringEscapeUtils.unescapeJava( + this.meta.getOption(StorageConstants.TEXT_NULL, NullDatum.DEFAULT_TEXT)); + + if (StringUtils.isEmpty(nullCharacters)) { + nullChars = NullDatum.get().asTextBytes(); + } else { + nullChars = nullCharacters.getBytes(); + } + } + + @Override + public void init() throws IOException { + if (!fs.exists(path.getParent())) { + throw new FileNotFoundException(path.getParent().toString()); + } + + //determine the intermediate file type + String store = conf.get(TajoConf.ConfVars.SHUFFLE_FILE_FORMAT.varname, + TajoConf.ConfVars.SHUFFLE_FILE_FORMAT.defaultVal); + if (enabledStats && CatalogProtos.StoreType.CSV == CatalogProtos.StoreType.valueOf(store.toUpperCase())) { + isShuffle = true; + } else { + isShuffle = false; + } + + if(this.meta.containsOption(StorageConstants.COMPRESSION_CODEC)) { + String codecName = this.meta.getOption(StorageConstants.COMPRESSION_CODEC); + codecFactory = new CompressionCodecFactory(conf); + codec = codecFactory.getCodecByClassName(codecName); + compressor = CodecPool.getCompressor(codec); + if(compressor != null) compressor.reset(); //builtin gzip is null + + String extension = codec.getDefaultExtension(); + compressedPath = path.suffix(extension); + + if (fs.exists(compressedPath)) { + throw new AlreadyExistsStorageException(compressedPath); + } + + fos = fs.create(compressedPath); + deflateFilter = codec.createOutputStream(fos, compressor); + outputStream = new DataOutputStream(deflateFilter); + + } else { + if (fs.exists(path)) { + throw new AlreadyExistsStorageException(path); + } + fos = fs.create(path); + outputStream = new DataOutputStream(new BufferedOutputStream(fos)); + } + + if (enabledStats) { + this.stats = new TableStatistics(this.schema); + } + + try { + //It will be remove, because we will add custom serde in textfile + String serdeClass = this.meta.getOption(StorageConstants.CSVFILE_SERDE, + TextSerializerDeserializer.class.getName()); + serde = (SerializerDeserializer) Class.forName(serdeClass).newInstance(); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new IOException(e); + } + + os.reset(); + pos = fos.getPos(); + bufferedBytes = 0; + super.init(); + } + + + @Override + public void addTuple(Tuple tuple) throws IOException { + Datum datum; + int rowBytes = 0; + + for (int i = 0; i < columnNum; i++) { + datum = tuple.get(i); + rowBytes += serde.serialize(schema.getColumn(i), datum, os, nullChars); + + if(columnNum - 1 > i){ + os.write((byte) delimiter); + rowBytes += 1; + } + if (isShuffle) { + // it is to calculate min/max values, and it is only used for the intermediate file. + stats.analyzeField(i, datum); + } + } + os.write(LF); + rowBytes += 1; + + pos += rowBytes; + bufferedBytes += rowBytes; + if(bufferedBytes > BUFFER_SIZE){ + flushBuffer(); + } + // Statistical section + if (enabledStats) { + stats.incrementRow(); + } + } + + private void flushBuffer() throws IOException { + if(os.getLength() > 0) { + os.writeTo(outputStream); + os.reset(); + bufferedBytes = 0; + } + } + @Override + public long getOffset() throws IOException { + return pos; + } + + @Override + public void flush() throws IOException { + flushBuffer(); + outputStream.flush(); + } + + @Override + public void close() throws IOException { + + try { + flush(); + + // Statistical section + if (enabledStats) { + stats.setNumBytes(getOffset()); + } + + if(deflateFilter != null) { + deflateFilter.finish(); + deflateFilter.resetState(); + deflateFilter = null; + } + + os.close(); + } finally { + IOUtils.cleanup(LOG, fos); + if (compressor != null) { + CodecPool.returnCompressor(compressor); + compressor = null; + } + } + } + + @Override + public TableStats getStats() { + if (enabledStats) { + return stats.getTableStat(); + } else { + return null; + } + } + + public boolean isCompress() { + return compressor != null; + } + + public String getExtension() { + return codec != null ? codec.getDefaultExtension() : ""; + } + } + + public static class CSVScanner extends FileScanner implements SeekableScanner { + public CSVScanner(Configuration conf, final Schema schema, final TableMeta meta, final Fragment fragment) + throws IOException { + super(conf, schema, meta, fragment); + factory = new CompressionCodecFactory(conf); + codec = factory.getCodec(this.fragment.getPath()); + if (codec == null || codec instanceof SplittableCompressionCodec) { + splittable = true; + } + + //Delimiter + this.delimiter = StringEscapeUtils.unescapeJava( + meta.getOption(StorageConstants.TEXT_DELIMITER, + meta.getOption(StorageConstants.CSVFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER))).charAt(0); + + String nullCharacters = StringEscapeUtils.unescapeJava( + meta.getOption(StorageConstants.TEXT_NULL, + meta.getOption(StorageConstants.CSVFILE_NULL, NullDatum.DEFAULT_TEXT))); + + if (StringUtils.isEmpty(nullCharacters)) { + nullChars = NullDatum.get().asTextBytes(); + } else { + nullChars = nullCharacters.getBytes(); + } + } + + private final static int DEFAULT_PAGE_SIZE = 256 * 1024; + private char delimiter; + private FileSystem fs; + private FSDataInputStream fis; + private InputStream is; //decompressd stream + private CompressionCodecFactory factory; + private CompressionCodec codec; + private Decompressor decompressor; + private Seekable filePosition; + private boolean splittable = false; + private long startOffset, end, pos; + private int currentIdx = 0, validIdx = 0, recordCount = 0; + private int[] targetColumnIndexes; + private boolean eof = false; + private final byte[] nullChars; + private SplitLineReader reader; + private ArrayList<Long> fileOffsets; + private ArrayList<Integer> rowLengthList; + private ArrayList<Integer> startOffsets; + private NonSyncByteArrayOutputStream buffer; + private SerializerDeserializer serde; + + @Override + public void init() throws IOException { + fileOffsets = new ArrayList<Long>(); + rowLengthList = new ArrayList<Integer>(); + startOffsets = new ArrayList<Integer>(); + buffer = new NonSyncByteArrayOutputStream(DEFAULT_PAGE_SIZE); + + // FileFragment information + if(fs == null) { + fs = FileScanner.getFileSystem((TajoConf)conf, fragment.getPath()); + } + if(fis == null) fis = fs.open(fragment.getPath()); + + recordCount = 0; + pos = startOffset = fragment.getStartKey(); + end = startOffset + fragment.getLength(); + + if (codec != null) { + decompressor = CodecPool.getDecompressor(codec); + if (codec instanceof SplittableCompressionCodec) { + SplitCompressionInputStream cIn = ((SplittableCompressionCodec) codec).createInputStream( + fis, decompressor, startOffset, end, + SplittableCompressionCodec.READ_MODE.BYBLOCK); + + reader = new CompressedSplitLineReader(cIn, conf, null); + startOffset = cIn.getAdjustedStart(); + end = cIn.getAdjustedEnd(); + filePosition = cIn; + is = cIn; + } else { + is = new DataInputStream(codec.createInputStream(fis, decompressor)); + reader = new SplitLineReader(is, null); + filePosition = fis; + } + } else { + fis.seek(startOffset); + filePosition = fis; + is = fis; + reader = new SplitLineReader(is, null); + } + + if (targets == null) { + targets = schema.toArray(); + } + + targetColumnIndexes = new int[targets.length]; + for (int i = 0; i < targets.length; i++) { + targetColumnIndexes[i] = schema.getColumnId(targets[i].getQualifiedName()); + } + + try { + //FIXME + String serdeClass = this.meta.getOption(StorageConstants.CSVFILE_SERDE, + TextSerializerDeserializer.class.getName()); + serde = (SerializerDeserializer) Class.forName(serdeClass).newInstance(); + } catch (Exception e) { + LOG.error(e.getMessage(), e); + throw new IOException(e); + } + + super.init(); + Arrays.sort(targetColumnIndexes); + if (LOG.isDebugEnabled()) { + LOG.debug("CSVScanner open:" + fragment.getPath() + "," + startOffset + "," + end + + "," + fs.getFileStatus(fragment.getPath()).getLen()); + } + + if (startOffset != 0) { + pos += reader.readLine(new Text(), 0, maxBytesToConsume(pos)); + } + eof = false; + page(); + } + + private int maxBytesToConsume(long pos) { + return isCompress() ? Integer.MAX_VALUE : (int) Math.min(Integer.MAX_VALUE, end - pos); + } + + private long fragmentable() throws IOException { + return end - getFilePosition(); + } + + private long getFilePosition() throws IOException { + long retVal; + if (isCompress()) { + retVal = filePosition.getPos(); + } else { + retVal = pos; + } + return retVal; + } + + private void page() throws IOException { +// // Index initialization + currentIdx = 0; + validIdx = 0; + int currentBufferPos = 0; + int bufferedSize = 0; + + buffer.reset(); + startOffsets.clear(); + rowLengthList.clear(); + fileOffsets.clear(); + + if(eof) { + return; + } + + while (DEFAULT_PAGE_SIZE >= bufferedSize){ + + int ret = reader.readDefaultLine(buffer, rowLengthList, Integer.MAX_VALUE, Integer.MAX_VALUE); + + if(ret == 0){ + break; + } else { + fileOffsets.add(pos); + pos += ret; + startOffsets.add(currentBufferPos); + currentBufferPos += rowLengthList.get(rowLengthList.size() - 1); + bufferedSize += ret; + validIdx++; + recordCount++; + } + + if(getFilePosition() > end && !reader.needAdditionalRecordAfterSplit()){ + eof = true; + break; + } + } + if (tableStats != null) { + tableStats.setReadBytes(pos - startOffset); + tableStats.setNumRows(recordCount); + } + } + + @Override + public float getProgress() { + try { + if(eof) { + return 1.0f; + } + long filePos = getFilePosition(); + if (startOffset == filePos) { + return 0.0f; + } else { + long readBytes = filePos - startOffset; + long remainingBytes = Math.max(end - filePos, 0); + return Math.min(1.0f, (float)(readBytes) / (float)(readBytes + remainingBytes)); + } + } catch (IOException e) { + LOG.error(e.getMessage(), e); + return 0.0f; + } + } + + @Override + public Tuple next() throws IOException { + try { + if (currentIdx == validIdx) { + if (eof) { + return null; + } else { + page(); + + if(currentIdx == validIdx){ + return null; + } + } + } + + long offset = -1; + if(!isCompress()){ + offset = fileOffsets.get(currentIdx); + } + + byte[][] cells = BytesUtils.splitPreserveAllTokens(buffer.getData(), startOffsets.get(currentIdx), + rowLengthList.get(currentIdx), delimiter, targetColumnIndexes); + currentIdx++; + return new LazyTuple(schema, cells, offset, nullChars, serde); + } catch (Throwable t) { + LOG.error("Tuple list length: " + (fileOffsets != null ? fileOffsets.size() : 0), t); + LOG.error("Tuple list current index: " + currentIdx, t); + throw new IOException(t); + } + } + + private boolean isCompress() { + return codec != null; + } + + @Override + public void reset() throws IOException { + if (decompressor != null) { + CodecPool.returnDecompressor(decompressor); + decompressor = null; + } + + init(); + } + + @Override + public void close() throws IOException { + try { + if (tableStats != null) { + tableStats.setReadBytes(pos - startOffset); //Actual Processed Bytes. (decompressed bytes + overhead) + tableStats.setNumRows(recordCount); + } + + IOUtils.cleanup(LOG, reader, is, fis); + fs = null; + is = null; + fis = null; + if (LOG.isDebugEnabled()) { + LOG.debug("CSVScanner processed record:" + recordCount); + } + } finally { + if (decompressor != null) { + CodecPool.returnDecompressor(decompressor); + decompressor = null; + } + } + } + + @Override + public boolean isProjectable() { + return true; + } + + @Override + public boolean isSelectable() { + return false; + } + + @Override + public void setSearchCondition(Object expr) { + } + + @Override + public void seek(long offset) throws IOException { + if(isCompress()) throw new UnsupportedException(); + + int tupleIndex = Arrays.binarySearch(fileOffsets.toArray(), offset); + + if (tupleIndex > -1) { + this.currentIdx = tupleIndex; + } else if (isSplittable() && end >= offset || startOffset <= offset) { + eof = false; + fis.seek(offset); + pos = offset; + reader.reset(); + this.currentIdx = 0; + this.validIdx = 0; + // pageBuffer(); + } else { + throw new IOException("invalid offset " + + " < start : " + startOffset + " , " + + " end : " + end + " , " + + " filePos : " + filePosition.getPos() + " , " + + " input offset : " + offset + " >"); + } + } + + @Override + public long getNextOffset() throws IOException { + if(isCompress()) throw new UnsupportedException(); + + if (this.currentIdx == this.validIdx) { + if (fragmentable() <= 0) { + return -1; + } else { + page(); + if(currentIdx == validIdx) return -1; + } + } + return fileOffsets.get(currentIdx); + } + + @Override + public boolean isSplittable(){ + return splittable; + } + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CompressedSplitLineReader.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CompressedSplitLineReader.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CompressedSplitLineReader.java new file mode 100644 index 0000000..4f58e68 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/CompressedSplitLineReader.java @@ -0,0 +1,182 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.compress.SplitCompressionInputStream; +import org.apache.tajo.storage.rcfile.NonSyncByteArrayOutputStream; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; + +/** + * Line reader for compressed splits + * + * Reading records from a compressed split is tricky, as the + * LineRecordReader is using the reported compressed input stream + * position directly to determine when a split has ended. In addition the + * compressed input stream is usually faking the actual byte position, often + * updating it only after the first compressed block after the split is + * accessed. + * + * Depending upon where the last compressed block of the split ends relative + * to the record delimiters it can be easy to accidentally drop the last + * record or duplicate the last record between this split and the next. + * + * Split end scenarios: + * + * 1) Last block of split ends in the middle of a record + * Nothing special that needs to be done here, since the compressed input + * stream will report a position after the split end once the record + * is fully read. The consumer of the next split will discard the + * partial record at the start of the split normally, and no data is lost + * or duplicated between the splits. + * + * 2) Last block of split ends in the middle of a delimiter + * The line reader will continue to consume bytes into the next block to + * locate the end of the delimiter. If a custom delimiter is being used + * then the next record must be read by this split or it will be dropped. + * The consumer of the next split will not recognize the partial + * delimiter at the beginning of its split and will discard it along with + * the next record. + * + * However for the default delimiter processing there is a special case + * because CR, LF, and CRLF are all valid record delimiters. If the + * block ends with a CR then the reader must peek at the next byte to see + * if it is an LF and therefore part of the same record delimiter. + * Peeking at the next byte is an access to the next block and triggers + * the stream to report the end of the split. There are two cases based + * on the next byte: + * + * A) The next byte is LF + * The split needs to end after the current record is returned. The + * consumer of the next split will discard the first record, which + * is degenerate since LF is itself a delimiter, and start consuming + * records after that byte. If the current split tries to read + * another record then the record will be duplicated between splits. + * + * B) The next byte is not LF + * The current record will be returned but the stream will report + * the split has ended due to the peek into the next block. If the + * next record is not read then it will be lost, as the consumer of + * the next split will discard it before processing subsequent + * records. Therefore the next record beyond the reported split end + * must be consumed by this split to avoid data loss. + * + * 3) Last block of split ends at the beginning of a delimiter + * This is equivalent to case 1, as the reader will consume bytes into + * the next block and trigger the end of the split. No further records + * should be read as the consumer of the next split will discard the + * (degenerate) record at the beginning of its split. + * + * 4) Last block of split ends at the end of a delimiter + * Nothing special needs to be done here. The reader will not start + * examining the bytes into the next block until the next record is read, + * so the stream will not report the end of the split just yet. Once the + * next record is read then the next block will be accessed and the + * stream will indicate the end of the split. The consumer of the next + * split will correctly discard the first record of its split, and no + * data is lost or duplicated. + * + * If the default delimiter is used and the block ends at a CR then this + * is treated as case 2 since the reader does not yet know without + * looking at subsequent bytes whether the delimiter has ended. + * + * NOTE: It is assumed that compressed input streams *never* return bytes from + * multiple compressed blocks from a single read. Failure to do so will + * violate the buffering performed by this class, as it will access + * bytes into the next block after the split before returning all of the + * records from the previous block. + */ + +public class CompressedSplitLineReader extends SplitLineReader { + SplitCompressionInputStream scin; + private boolean usingCRLF; + private boolean needAdditionalRecord = false; + private boolean finished = false; + + public CompressedSplitLineReader(SplitCompressionInputStream in, + Configuration conf, + byte[] recordDelimiterBytes) + throws IOException { + super(in, conf, recordDelimiterBytes); + scin = in; + usingCRLF = (recordDelimiterBytes == null); + } + + @Override + protected int fillBuffer(InputStream in, byte[] buffer, boolean inDelimiter) + throws IOException { + int bytesRead = in.read(buffer); + + // If the split ended in the middle of a record delimiter then we need + // to read one additional record, as the consumer of the next split will + // not recognize the partial delimiter as a record. + // However if using the default delimiter and the next character is a + // linefeed then next split will treat it as a delimiter all by itself + // and the additional record read should not be performed. + if (inDelimiter && bytesRead > 0) { + if (usingCRLF) { + needAdditionalRecord = (buffer[0] != '\n'); + } else { + needAdditionalRecord = true; + } + } + return bytesRead; + } + + @Override + public int readLine(Text str, int maxLineLength, int maxBytesToConsume) + throws IOException { + int bytesRead = 0; + if (!finished) { + // only allow at most one more record to be read after the stream + // reports the split ended + if (scin.getPos() > scin.getAdjustedEnd()) { + finished = true; + } + + bytesRead = super.readLine(str, maxLineLength, maxBytesToConsume); + } + return bytesRead; + } + + @Override + public int readDefaultLine(NonSyncByteArrayOutputStream str, ArrayList<Integer> offsets, int maxLineLength + , int maxBytesToConsume) throws IOException { + int bytesRead = 0; + if (!finished) { + // only allow at most one more record to be read after the stream + // reports the split ended + if (scin.getPos() > scin.getAdjustedEnd()) { + finished = true; + } + + bytesRead = super.readDefaultLine(str, offsets, maxLineLength, maxBytesToConsume); + } + return bytesRead; + } + + @Override + public boolean needAdditionalRecordAfterSplit() { + return !finished && needAdditionalRecord; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java new file mode 100644 index 0000000..47f67c6 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileAppender.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.tajo.QueryUnitAttemptId; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.conf.TajoConf; + +import java.io.IOException; + +public abstract class FileAppender implements Appender { + private static final Log LOG = LogFactory.getLog(FileAppender.class); + + protected boolean inited = false; + + protected final Configuration conf; + protected final TableMeta meta; + protected final Schema schema; + protected final Path workDir; + protected final QueryUnitAttemptId taskAttemptId; + + protected boolean enabledStats; + protected Path path; + + public FileAppender(Configuration conf, QueryUnitAttemptId taskAttemptId, Schema schema, + TableMeta meta, Path workDir) { + this.conf = conf; + this.meta = meta; + this.schema = schema; + this.workDir = workDir; + this.taskAttemptId = taskAttemptId; + + try { + if (taskAttemptId != null) { + this.path = ((FileStorageManager)StorageManager.getFileStorageManager((TajoConf) conf)) + .getAppenderFilePath(taskAttemptId, workDir); + } else { + this.path = workDir; + } + } catch (IOException e) { + LOG.error(e.getMessage(), e); + throw new IllegalStateException("Error while opeining FileAppender: " + e.getMessage(), e); + } + } + + public void init() throws IOException { + if (inited) { + throw new IllegalStateException("FileAppender is already initialized."); + } + inited = true; + } + + public void enableStats() { + if (inited) { + throw new IllegalStateException("Should enable this option before init()"); + } + + this.enabledStats = true; + } + + public long getEstimatedOutputSize() throws IOException { + return getOffset(); + } + + public abstract long getOffset() throws IOException; +} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileScanner.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileScanner.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileScanner.java new file mode 100644 index 0000000..038f0f4 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/FileScanner.java @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.tajo.catalog.Column; +import org.apache.tajo.catalog.Schema; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.statistics.ColumnStats; +import org.apache.tajo.catalog.statistics.TableStats; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.storage.fragment.FileFragment; +import org.apache.tajo.storage.fragment.Fragment; + +import java.io.IOException; + +public abstract class FileScanner implements Scanner { + private static final Log LOG = LogFactory.getLog(FileScanner.class); + + protected boolean inited = false; + protected final Configuration conf; + protected final TableMeta meta; + protected final Schema schema; + protected final FileFragment fragment; + protected final int columnNum; + + protected Column [] targets; + + protected float progress; + + protected TableStats tableStats; + + public FileScanner(Configuration conf, final Schema schema, final TableMeta meta, final Fragment fragment) { + this.conf = conf; + this.meta = meta; + this.schema = schema; + this.fragment = (FileFragment)fragment; + this.tableStats = new TableStats(); + this.columnNum = this.schema.size(); + } + + public void init() throws IOException { + inited = true; + progress = 0.0f; + + if (fragment != null) { + tableStats.setNumBytes(fragment.getLength()); + tableStats.setNumBlocks(1); + } + + if (schema != null) { + for(Column eachColumn: schema.getColumns()) { + ColumnStats columnStats = new ColumnStats(eachColumn); + tableStats.addColumnStat(columnStats); + } + } + } + + @Override + public Schema getSchema() { + return schema; + } + + @Override + public void setTarget(Column[] targets) { + if (inited) { + throw new IllegalStateException("Should be called before init()"); + } + this.targets = targets; + } + + public void setSearchCondition(Object expr) { + if (inited) { + throw new IllegalStateException("Should be called before init()"); + } + } + + public static FileSystem getFileSystem(TajoConf tajoConf, Path path) throws IOException { + String tajoUser = tajoConf.getVar(TajoConf.ConfVars.USERNAME); + FileSystem fs; + if(tajoUser != null) { + try { + fs = FileSystem.get(path.toUri(), tajoConf, tajoUser); + } catch (InterruptedException e) { + LOG.warn("Occur InterruptedException while FileSystem initiating with user[" + tajoUser + "]"); + fs = FileSystem.get(path.toUri(), tajoConf); + } + } else { + fs = FileSystem.get(path.toUri(), tajoConf); + } + + return fs; + } + + @Override + public float getProgress() { + return progress; + } + + @Override + public TableStats getInputStats() { + return tableStats; + } +}
