http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoParquetWriter.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoParquetWriter.java b/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoParquetWriter.java deleted file mode 100644 index 69b76c4..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoParquetWriter.java +++ /dev/null @@ -1,104 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.parquet; - -import org.apache.hadoop.fs.Path; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.storage.Tuple; -import org.apache.tajo.storage.thirdparty.parquet.ParquetWriter; -import parquet.hadoop.metadata.CompressionCodecName; - -import java.io.IOException; - -/** - * Tajo implementation of {@link ParquetWriter} to write Tajo records to a - * Parquet file. Users should use {@link ParquetAppender} and not this class - * directly. - */ -public class TajoParquetWriter extends ParquetWriter<Tuple> { - /** - * Create a new TajoParquetWriter - * - * @param file The file name to write to. - * @param schema The Tajo schema of the table. - * @param compressionCodecName Compression codec to use, or - * CompressionCodecName.UNCOMPRESSED. - * @param blockSize The block size threshold. - * @param pageSize See parquet write up. Blocks are subdivided into pages - * for alignment. - * @throws IOException - */ - public TajoParquetWriter(Path file, - Schema schema, - CompressionCodecName compressionCodecName, - int blockSize, - int pageSize) throws IOException { - super(file, - new TajoWriteSupport(schema), - compressionCodecName, - blockSize, - pageSize); - } - - /** - * Create a new TajoParquetWriter. - * - * @param file The file name to write to. - * @param schema The Tajo schema of the table. - * @param compressionCodecName Compression codec to use, or - * CompressionCodecName.UNCOMPRESSED. - * @param blockSize The block size threshold. - * @param pageSize See parquet write up. Blocks are subdivided into pages - * for alignment. - * @param enableDictionary Whether to use a dictionary to compress columns. - * @param validating Whether to turn on validation. - * @throws IOException - */ - public TajoParquetWriter(Path file, - Schema schema, - CompressionCodecName compressionCodecName, - int blockSize, - int pageSize, - boolean enableDictionary, - boolean validating) throws IOException { - super(file, - new TajoWriteSupport(schema), - compressionCodecName, - blockSize, - pageSize, - enableDictionary, - validating); - } - - /** - * Creates a new TajoParquetWriter. The default block size is 128 MB. - * The default page size is 1 MB. Default compression is no compression. - * - * @param file The Path of the file to write to. - * @param schema The Tajo schema of the table. - * @throws IOException - */ - public TajoParquetWriter(Path file, Schema schema) throws IOException { - this(file, - schema, - CompressionCodecName.UNCOMPRESSED, - DEFAULT_BLOCK_SIZE, - DEFAULT_PAGE_SIZE); - } -}
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoReadSupport.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoReadSupport.java b/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoReadSupport.java deleted file mode 100644 index 269f782..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoReadSupport.java +++ /dev/null @@ -1,101 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.parquet; - -import java.util.Map; - -import parquet.Log; -import parquet.hadoop.api.InitContext; -import parquet.hadoop.api.ReadSupport; -import parquet.io.api.RecordMaterializer; -import parquet.schema.MessageType; - -import org.apache.hadoop.conf.Configuration; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.json.CatalogGsonHelper; -import org.apache.tajo.storage.Tuple; - -/** - * Tajo implementation of {@link ReadSupport} for {@link Tuple}s. - * Users should use {@link ParquetScanner} and not this class directly. - */ -public class TajoReadSupport extends ReadSupport<Tuple> { - private static final Log LOG = Log.getLog(TajoReadSupport.class); - - private Schema readSchema; - private Schema requestedSchema; - - /** - * Creates a new TajoReadSupport. - * - * @param requestedSchema The Tajo schema of the requested projection passed - * down by ParquetScanner. - */ - public TajoReadSupport(Schema readSchema, Schema requestedSchema) { - super(); - this.readSchema = readSchema; - this.requestedSchema = requestedSchema; - } - - /** - * Creates a new TajoReadSupport. - * - * @param readSchema The schema of the table. - */ - public TajoReadSupport(Schema readSchema) { - super(); - this.readSchema = readSchema; - this.requestedSchema = readSchema; - } - - /** - * Initializes the ReadSupport. - * - * @param context The InitContext. - * @return A ReadContext that defines how to read the file. - */ - @Override - public ReadContext init(InitContext context) { - if (requestedSchema == null) { - throw new RuntimeException("requestedSchema is null."); - } - MessageType requestedParquetSchema = - new TajoSchemaConverter().convert(requestedSchema); - LOG.debug("Reading data with projection:\n" + requestedParquetSchema); - return new ReadContext(requestedParquetSchema); - } - - /** - * Prepares for read. - * - * @param configuration The job configuration. - * @param keyValueMetaData App-specific metadata from the file. - * @param fileSchema The schema of the Parquet file. - * @param readContext Returned by the init method. - */ - @Override - public RecordMaterializer<Tuple> prepareForRead( - Configuration configuration, - Map<String, String> keyValueMetaData, - MessageType fileSchema, - ReadContext readContext) { - MessageType parquetRequestedSchema = readContext.getRequestedSchema(); - return new TajoRecordMaterializer(parquetRequestedSchema, requestedSchema, readSchema); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java b/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java deleted file mode 100644 index a091eac..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java +++ /dev/null @@ -1,380 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.parquet; - -import com.google.protobuf.InvalidProtocolBufferException; -import com.google.protobuf.Message; -import org.apache.tajo.catalog.Column; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.common.TajoDataTypes; -import org.apache.tajo.common.TajoDataTypes.DataType; -import org.apache.tajo.datum.*; -import org.apache.tajo.storage.Tuple; -import org.apache.tajo.storage.VTuple; -import parquet.io.api.Binary; -import parquet.io.api.Converter; -import parquet.io.api.GroupConverter; -import parquet.io.api.PrimitiveConverter; -import parquet.schema.GroupType; -import parquet.schema.Type; - -import java.nio.ByteBuffer; - -/** - * Converter to convert a Parquet record into a Tajo Tuple. - */ -public class TajoRecordConverter extends GroupConverter { - private final GroupType parquetSchema; - private final Schema tajoReadSchema; - private final int[] projectionMap; - private final int tupleSize; - - private final Converter[] converters; - - private Tuple currentTuple; - - /** - * Creates a new TajoRecordConverter. - * - * @param parquetSchema The Parquet schema of the projection. - * @param tajoReadSchema The Tajo schema of the table. - * @param projectionMap An array mapping the projection column to the column - * index in the table. - */ - public TajoRecordConverter(GroupType parquetSchema, Schema tajoReadSchema, - int[] projectionMap) { - this.parquetSchema = parquetSchema; - this.tajoReadSchema = tajoReadSchema; - this.projectionMap = projectionMap; - this.tupleSize = tajoReadSchema.size(); - - // The projectionMap.length does not match parquetSchema.getFieldCount() - // when the projection contains NULL_TYPE columns. We will skip over the - // NULL_TYPE columns when we construct the converters and populate the - // NULL_TYPE columns with NullDatums in start(). - int index = 0; - this.converters = new Converter[parquetSchema.getFieldCount()]; - for (int i = 0; i < projectionMap.length; ++i) { - final int projectionIndex = projectionMap[i]; - Column column = tajoReadSchema.getColumn(projectionIndex); - if (column.getDataType().getType() == TajoDataTypes.Type.NULL_TYPE) { - continue; - } - Type type = parquetSchema.getType(index); - converters[index] = newConverter(column, type, new ParentValueContainer() { - @Override - void add(Object value) { - TajoRecordConverter.this.set(projectionIndex, value); - } - }); - ++index; - } - } - - private void set(int index, Object value) { - currentTuple.put(index, (Datum)value); - } - - private Converter newConverter(Column column, Type type, - ParentValueContainer parent) { - DataType dataType = column.getDataType(); - switch (dataType.getType()) { - case BOOLEAN: - return new FieldBooleanConverter(parent); - case BIT: - return new FieldBitConverter(parent); - case CHAR: - return new FieldCharConverter(parent); - case INT2: - return new FieldInt2Converter(parent); - case INT4: - return new FieldInt4Converter(parent); - case INT8: - return new FieldInt8Converter(parent); - case FLOAT4: - return new FieldFloat4Converter(parent); - case FLOAT8: - return new FieldFloat8Converter(parent); - case INET4: - return new FieldInet4Converter(parent); - case INET6: - throw new RuntimeException("No converter for INET6"); - case TEXT: - return new FieldTextConverter(parent); - case PROTOBUF: - return new FieldProtobufConverter(parent, dataType); - case BLOB: - return new FieldBlobConverter(parent); - case NULL_TYPE: - throw new RuntimeException("No converter for NULL_TYPE."); - default: - throw new RuntimeException("Unsupported data type"); - } - } - - /** - * Gets the converter for a specific field. - * - * @param fieldIndex Index of the field in the projection. - * @return The converter for the field. - */ - @Override - public Converter getConverter(int fieldIndex) { - return converters[fieldIndex]; - } - - /** - * Called before processing fields. This method fills any fields that have - * NULL values or have type NULL_TYPE with a NullDatum. - */ - @Override - public void start() { - currentTuple = new VTuple(tupleSize); - } - - /** - * Called after all fields have been processed. - */ - @Override - public void end() { - for (int i = 0; i < projectionMap.length; ++i) { - final int projectionIndex = projectionMap[i]; - Column column = tajoReadSchema.getColumn(projectionIndex); - if (column.getDataType().getType() == TajoDataTypes.Type.NULL_TYPE - || currentTuple.get(projectionIndex) == null) { - set(projectionIndex, NullDatum.get()); - } - } - } - - /** - * Returns the current record converted by this converter. - * - * @return The current record. - */ - public Tuple getCurrentRecord() { - return currentTuple; - } - - static abstract class ParentValueContainer { - /** - * Adds the value to the parent. - * - * @param value The value to add. - */ - abstract void add(Object value); - } - - static final class FieldBooleanConverter extends PrimitiveConverter { - private final ParentValueContainer parent; - - public FieldBooleanConverter(ParentValueContainer parent) { - this.parent = parent; - } - - @Override - final public void addBoolean(boolean value) { - parent.add(DatumFactory.createBool(value)); - } - } - - static final class FieldBitConverter extends PrimitiveConverter { - private final ParentValueContainer parent; - - public FieldBitConverter(ParentValueContainer parent) { - this.parent = parent; - } - - @Override - final public void addInt(int value) { - parent.add(DatumFactory.createBit((byte)(value & 0xff))); - } - } - - static final class FieldCharConverter extends PrimitiveConverter { - private final ParentValueContainer parent; - - public FieldCharConverter(ParentValueContainer parent) { - this.parent = parent; - } - - @Override - final public void addBinary(Binary value) { - parent.add(DatumFactory.createChar(value.getBytes())); - } - } - - static final class FieldInt2Converter extends PrimitiveConverter { - private final ParentValueContainer parent; - - public FieldInt2Converter(ParentValueContainer parent) { - this.parent = parent; - } - - @Override - final public void addInt(int value) { - parent.add(DatumFactory.createInt2((short)value)); - } - } - - static final class FieldInt4Converter extends PrimitiveConverter { - private final ParentValueContainer parent; - - public FieldInt4Converter(ParentValueContainer parent) { - this.parent = parent; - } - - @Override - final public void addInt(int value) { - parent.add(DatumFactory.createInt4(value)); - } - } - - static final class FieldInt8Converter extends PrimitiveConverter { - private final ParentValueContainer parent; - - public FieldInt8Converter(ParentValueContainer parent) { - this.parent = parent; - } - - @Override - final public void addLong(long value) { - parent.add(DatumFactory.createInt8(value)); - } - - @Override - final public void addInt(int value) { - parent.add(DatumFactory.createInt8(Long.valueOf(value))); - } - } - - static final class FieldFloat4Converter extends PrimitiveConverter { - private final ParentValueContainer parent; - - public FieldFloat4Converter(ParentValueContainer parent) { - this.parent = parent; - } - - @Override - final public void addInt(int value) { - parent.add(DatumFactory.createFloat4(Float.valueOf(value))); - } - - @Override - final public void addLong(long value) { - parent.add(DatumFactory.createFloat4(Float.valueOf(value))); - } - - @Override - final public void addFloat(float value) { - parent.add(DatumFactory.createFloat4(value)); - } - } - - static final class FieldFloat8Converter extends PrimitiveConverter { - private final ParentValueContainer parent; - - public FieldFloat8Converter(ParentValueContainer parent) { - this.parent = parent; - } - - @Override - final public void addInt(int value) { - parent.add(DatumFactory.createFloat8(Double.valueOf(value))); - } - - @Override - final public void addLong(long value) { - parent.add(DatumFactory.createFloat8(Double.valueOf(value))); - } - - @Override - final public void addFloat(float value) { - parent.add(DatumFactory.createFloat8(Double.valueOf(value))); - } - - @Override - final public void addDouble(double value) { - parent.add(DatumFactory.createFloat8(value)); - } - } - - static final class FieldInet4Converter extends PrimitiveConverter { - private final ParentValueContainer parent; - - public FieldInet4Converter(ParentValueContainer parent) { - this.parent = parent; - } - - @Override - final public void addBinary(Binary value) { - parent.add(DatumFactory.createInet4(value.getBytes())); - } - } - - static final class FieldTextConverter extends PrimitiveConverter { - private final ParentValueContainer parent; - - public FieldTextConverter(ParentValueContainer parent) { - this.parent = parent; - } - - @Override - final public void addBinary(Binary value) { - parent.add(DatumFactory.createText(value.getBytes())); - } - } - - static final class FieldBlobConverter extends PrimitiveConverter { - private final ParentValueContainer parent; - - public FieldBlobConverter(ParentValueContainer parent) { - this.parent = parent; - } - - @Override - final public void addBinary(Binary value) { - parent.add(new BlobDatum(ByteBuffer.wrap(value.getBytes()))); - } - } - - static final class FieldProtobufConverter extends PrimitiveConverter { - private final ParentValueContainer parent; - private final DataType dataType; - - public FieldProtobufConverter(ParentValueContainer parent, - DataType dataType) { - this.parent = parent; - this.dataType = dataType; - } - - @Override - final public void addBinary(Binary value) { - try { - ProtobufDatumFactory factory = - ProtobufDatumFactory.get(dataType.getCode()); - Message.Builder builder = factory.newBuilder(); - builder.mergeFrom(value.getBytes()); - parent.add(factory.createDatum(builder)); - } catch (InvalidProtocolBufferException e) { - throw new RuntimeException(e); - } - } - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoRecordMaterializer.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoRecordMaterializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoRecordMaterializer.java deleted file mode 100644 index e31828c..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoRecordMaterializer.java +++ /dev/null @@ -1,78 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.parquet; - -import parquet.io.api.GroupConverter; -import parquet.io.api.RecordMaterializer; -import parquet.schema.MessageType; - -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.Column; -import org.apache.tajo.storage.Tuple; - -/** - * Materializes a Tajo Tuple from a stream of Parquet data. - */ -class TajoRecordMaterializer extends RecordMaterializer<Tuple> { - private final TajoRecordConverter root; - - /** - * Creates a new TajoRecordMaterializer. - * - * @param parquetSchema The Parquet schema of the projection. - * @param tajoSchema The Tajo schema of the projection. - * @param tajoReadSchema The Tajo schema of the table. - */ - public TajoRecordMaterializer(MessageType parquetSchema, Schema tajoSchema, - Schema tajoReadSchema) { - int[] projectionMap = getProjectionMap(tajoReadSchema, tajoSchema); - this.root = new TajoRecordConverter(parquetSchema, tajoReadSchema, - projectionMap); - } - - private int[] getProjectionMap(Schema schema, Schema projection) { - Column[] targets = projection.toArray(); - int[] projectionMap = new int[targets.length]; - for (int i = 0; i < targets.length; ++i) { - int tid = schema.getColumnId(targets[i].getQualifiedName()); - projectionMap[i] = tid; - } - return projectionMap; - } - - /** - * Returns the current record being materialized. - * - * @return The record being materialized. - */ - @Override - public Tuple getCurrentRecord() { - return root.getCurrentRecord(); - } - - /** - * Returns the root converter. - * - * @return The root converter - */ - @Override - public GroupConverter getRootConverter() { - return root; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoSchemaConverter.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoSchemaConverter.java b/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoSchemaConverter.java deleted file mode 100644 index 2592231..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoSchemaConverter.java +++ /dev/null @@ -1,206 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.parquet; - -import org.apache.tajo.catalog.Column; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.common.TajoDataTypes; -import parquet.schema.MessageType; -import parquet.schema.OriginalType; -import parquet.schema.PrimitiveType; -import parquet.schema.PrimitiveType.PrimitiveTypeName; -import parquet.schema.Type; - -import java.util.ArrayList; -import java.util.List; - -/** - * Converts between Parquet and Tajo schemas. See package documentation for - * details on the mapping. - */ -public class TajoSchemaConverter { - private static final String TABLE_SCHEMA = "table_schema"; - - /** - * Creates a new TajoSchemaConverter. - */ - public TajoSchemaConverter() { - } - - /** - * Converts a Parquet schema to a Tajo schema. - * - * @param parquetSchema The Parquet schema to convert. - * @return The resulting Tajo schema. - */ - public Schema convert(MessageType parquetSchema) { - return convertFields(parquetSchema.getFields()); - } - - private Schema convertFields(List<Type> parquetFields) { - List<Column> columns = new ArrayList<Column>(); - for (int i = 0; i < parquetFields.size(); ++i) { - Type fieldType = parquetFields.get(i); - if (fieldType.isRepetition(Type.Repetition.REPEATED)) { - throw new RuntimeException("REPEATED not supported outside LIST or" + - " MAP. Type: " + fieldType); - } - columns.add(convertField(fieldType)); - } - Column[] columnsArray = new Column[columns.size()]; - columnsArray = columns.toArray(columnsArray); - return new Schema(columnsArray); - } - - private Column convertField(final Type fieldType) { - if (fieldType.isPrimitive()) { - return convertPrimitiveField(fieldType); - } else { - return convertComplexField(fieldType); - } - } - - private Column convertPrimitiveField(final Type fieldType) { - final String fieldName = fieldType.getName(); - final PrimitiveTypeName parquetPrimitiveTypeName = - fieldType.asPrimitiveType().getPrimitiveTypeName(); - final OriginalType originalType = fieldType.getOriginalType(); - return parquetPrimitiveTypeName.convert( - new PrimitiveType.PrimitiveTypeNameConverter<Column, RuntimeException>() { - @Override - public Column convertBOOLEAN(PrimitiveTypeName primitiveTypeName) { - return new Column(fieldName, TajoDataTypes.Type.BOOLEAN); - } - - @Override - public Column convertINT32(PrimitiveTypeName primitiveTypeName) { - return new Column(fieldName, TajoDataTypes.Type.INT4); - } - - @Override - public Column convertINT64(PrimitiveTypeName primitiveTypeName) { - return new Column(fieldName, TajoDataTypes.Type.INT8); - } - - @Override - public Column convertFLOAT(PrimitiveTypeName primitiveTypeName) { - return new Column(fieldName, TajoDataTypes.Type.FLOAT4); - } - - @Override - public Column convertDOUBLE(PrimitiveTypeName primitiveTypeName) { - return new Column(fieldName, TajoDataTypes.Type.FLOAT8); - } - - @Override - public Column convertFIXED_LEN_BYTE_ARRAY( - PrimitiveTypeName primitiveTypeName) { - return new Column(fieldName, TajoDataTypes.Type.BLOB); - } - - @Override - public Column convertBINARY(PrimitiveTypeName primitiveTypeName) { - if (originalType == OriginalType.UTF8) { - return new Column(fieldName, TajoDataTypes.Type.TEXT); - } else { - return new Column(fieldName, TajoDataTypes.Type.BLOB); - } - } - - @Override - public Column convertINT96(PrimitiveTypeName primitiveTypeName) { - throw new RuntimeException("Converting from INT96 not supported."); - } - }); - } - - private Column convertComplexField(final Type fieldType) { - throw new RuntimeException("Complex types not supported."); - } - - /** - * Converts a Tajo schema to a Parquet schema. - * - * @param tajoSchema The Tajo schema to convert. - * @return The resulting Parquet schema. - */ - public MessageType convert(Schema tajoSchema) { - List<Type> types = new ArrayList<Type>(); - for (int i = 0; i < tajoSchema.size(); ++i) { - Column column = tajoSchema.getColumn(i); - if (column.getDataType().getType() == TajoDataTypes.Type.NULL_TYPE) { - continue; - } - types.add(convertColumn(column)); - } - return new MessageType(TABLE_SCHEMA, types); - } - - private Type convertColumn(Column column) { - TajoDataTypes.Type type = column.getDataType().getType(); - switch (type) { - case BOOLEAN: - return primitive(column.getSimpleName(), - PrimitiveType.PrimitiveTypeName.BOOLEAN); - case BIT: - case INT2: - case INT4: - return primitive(column.getSimpleName(), - PrimitiveType.PrimitiveTypeName.INT32); - case INT8: - return primitive(column.getSimpleName(), - PrimitiveType.PrimitiveTypeName.INT64); - case FLOAT4: - return primitive(column.getSimpleName(), - PrimitiveType.PrimitiveTypeName.FLOAT); - case FLOAT8: - return primitive(column.getSimpleName(), - PrimitiveType.PrimitiveTypeName.DOUBLE); - case CHAR: - case TEXT: - return primitive(column.getSimpleName(), - PrimitiveType.PrimitiveTypeName.BINARY, - OriginalType.UTF8); - case PROTOBUF: - return primitive(column.getSimpleName(), - PrimitiveType.PrimitiveTypeName.BINARY); - case BLOB: - return primitive(column.getSimpleName(), - PrimitiveType.PrimitiveTypeName.BINARY); - case INET4: - case INET6: - return primitive(column.getSimpleName(), - PrimitiveType.PrimitiveTypeName.BINARY); - default: - throw new RuntimeException("Cannot convert Tajo type: " + type); - } - } - - private PrimitiveType primitive(String name, - PrimitiveType.PrimitiveTypeName primitive) { - return new PrimitiveType(Type.Repetition.OPTIONAL, primitive, name, null); - } - - private PrimitiveType primitive(String name, - PrimitiveType.PrimitiveTypeName primitive, - OriginalType originalType) { - return new PrimitiveType(Type.Repetition.OPTIONAL, primitive, name, - originalType); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoWriteSupport.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoWriteSupport.java b/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoWriteSupport.java deleted file mode 100644 index 8651131..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoWriteSupport.java +++ /dev/null @@ -1,148 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.parquet; - -import org.apache.hadoop.conf.Configuration; -import org.apache.tajo.catalog.Column; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.common.TajoDataTypes; -import org.apache.tajo.datum.Datum; -import org.apache.tajo.storage.Tuple; -import parquet.hadoop.api.WriteSupport; -import parquet.io.api.Binary; -import parquet.io.api.RecordConsumer; -import parquet.schema.GroupType; -import parquet.schema.MessageType; -import parquet.schema.Type; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** - * Tajo implementation of {@link WriteSupport} for {@link Tuple}s. - * Users should use {@link ParquetAppender} and not this class directly. - */ -public class TajoWriteSupport extends WriteSupport<Tuple> { - private RecordConsumer recordConsumer; - private MessageType rootSchema; - private Schema rootTajoSchema; - - /** - * Creates a new TajoWriteSupport. - * - * @param tajoSchema The Tajo schema for the table. - */ - public TajoWriteSupport(Schema tajoSchema) { - this.rootSchema = new TajoSchemaConverter().convert(tajoSchema); - this.rootTajoSchema = tajoSchema; - } - - /** - * Initializes the WriteSupport. - * - * @param configuration The job's configuration. - * @return A WriteContext that describes how to write the file. - */ - @Override - public WriteContext init(Configuration configuration) { - Map<String, String> extraMetaData = new HashMap<String, String>(); - return new WriteContext(rootSchema, extraMetaData); - } - - /** - * Called once per row group. - * - * @param recordConsumer The {@link RecordConsumer} to write to. - */ - @Override - public void prepareForWrite(RecordConsumer recordConsumer) { - this.recordConsumer = recordConsumer; - } - - /** - * Writes a Tuple to the file. - * - * @param tuple The Tuple to write to the file. - */ - @Override - public void write(Tuple tuple) { - recordConsumer.startMessage(); - writeRecordFields(rootSchema, rootTajoSchema, tuple); - recordConsumer.endMessage(); - } - - private void writeRecordFields(GroupType schema, Schema tajoSchema, - Tuple tuple) { - List<Type> fields = schema.getFields(); - // Parquet ignores Tajo NULL_TYPE columns, so the index may differ. - int index = 0; - for (int tajoIndex = 0; tajoIndex < tajoSchema.size(); ++tajoIndex) { - Column column = tajoSchema.getColumn(tajoIndex); - if (column.getDataType().getType() == TajoDataTypes.Type.NULL_TYPE) { - continue; - } - Datum datum = tuple.get(tajoIndex); - Type fieldType = fields.get(index); - if (!tuple.isNull(tajoIndex)) { - recordConsumer.startField(fieldType.getName(), index); - writeValue(fieldType, column, datum); - recordConsumer.endField(fieldType.getName(), index); - } else if (fieldType.isRepetition(Type.Repetition.REQUIRED)) { - throw new RuntimeException("Null-value for required field: " + - column.getSimpleName()); - } - ++index; - } - } - - private void writeValue(Type fieldType, Column column, Datum datum) { - switch (column.getDataType().getType()) { - case BOOLEAN: - recordConsumer.addBoolean(datum.asBool()); - break; - case BIT: - case INT2: - case INT4: - recordConsumer.addInteger(datum.asInt4()); - break; - case INT8: - recordConsumer.addLong(datum.asInt8()); - break; - case FLOAT4: - recordConsumer.addFloat(datum.asFloat4()); - break; - case FLOAT8: - recordConsumer.addDouble(datum.asFloat8()); - break; - case CHAR: - case TEXT: - recordConsumer.addBinary(Binary.fromByteArray(datum.asTextBytes())); - break; - case PROTOBUF: - case BLOB: - case INET4: - case INET6: - recordConsumer.addBinary(Binary.fromByteArray(datum.asByteArray())); - break; - default: - break; - } - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/package-info.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/package-info.java b/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/package-info.java deleted file mode 100644 index d7d16b7..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/package-info.java +++ /dev/null @@ -1,96 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * <p> - * Provides read and write support for Parquet files. Tajo schemas are - * converted to Parquet schemas according to the following mapping of Tajo - * and Parquet types: - * </p> - * - * <table> - * <tr> - * <th>Tajo type</th> - * <th>Parquet type</th> - * </tr> - * <tr> - * <td>NULL_TYPE</td> - * <td>No type. The field is not encoded in Parquet.</td> - * </tr> - * <tr> - * <td>BOOLEAN</td> - * <td>BOOLEAN</td> - * </tr> - * <tr> - * <td>BIT</td> - * <td>INT32</td> - * </tr> - * <tr> - * <td>INT2</td> - * <td>INT32</td> - * </tr> - * <tr> - * <td>INT4</td> - * <td>INT32</td> - * </tr> - * <tr> - * <td>INT8</td> - * <td>INT64</td> - * </tr> - * <tr> - * <td>FLOAT4</td> - * <td>FLOAT</td> - * </tr> - * <tr> - * <td>FLOAT8</td> - * <td>DOUBLE</td> - * </tr> - * <tr> - * <td>CHAR</td> - * <td>BINARY (with OriginalType UTF8)</td> - * </tr> - * <tr> - * <td>TEXT</td> - * <td>BINARY (with OriginalType UTF8)</td> - * </tr> - * <tr> - * <td>PROTOBUF</td> - * <td>BINARY</td> - * </tr> - * <tr> - * <td>BLOB</td> - * <td>BINARY</td> - * </tr> - * <tr> - * <td>INET4</td> - * <td>BINARY</td> - * </tr> - * </table> - * - * <p> - * Because Tajo fields can be NULL, all Parquet fields are marked as optional. - * </p> - * - * <p> - * The conversion from Tajo to Parquet is lossy without the original Tajo - * schema. As a result, Parquet files are read using the Tajo schema saved in - * the Tajo catalog for the table the Parquet files belong to, which was - * defined when the table was created. - * </p> - */ - -package org.apache.tajo.storage.parquet; http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/BytesRefArrayWritable.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/BytesRefArrayWritable.java b/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/BytesRefArrayWritable.java deleted file mode 100644 index 5e200a0..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/BytesRefArrayWritable.java +++ /dev/null @@ -1,261 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.rcfile; - -import com.google.common.base.Objects; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableFactories; -import org.apache.hadoop.io.WritableFactory; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.util.Arrays; - -/** - * <tt>BytesRefArrayWritable</tt> holds an array reference to BytesRefWritable, - * and is able to resize without recreating new array if not necessary. - * <p> - * - * Each <tt>BytesRefArrayWritable holds</tt> instance has a <i>valid</i> field, - * which is the desired valid number of <tt>BytesRefWritable</tt> it holds. - * <tt>resetValid</tt> can reset the valid, but it will not care the underlying - * BytesRefWritable. - */ - -public class BytesRefArrayWritable implements Writable, - Comparable<BytesRefArrayWritable> { - - private BytesRefWritable[] bytesRefWritables = null; - - private int valid = 0; - - /** - * Constructs an empty array with the specified capacity. - * - * @param capacity - * initial capacity - * @exception IllegalArgumentException - * if the specified initial capacity is negative - */ - public BytesRefArrayWritable(int capacity) { - if (capacity < 0) { - throw new IllegalArgumentException("Capacity can not be negative."); - } - bytesRefWritables = new BytesRefWritable[0]; - ensureCapacity(capacity); - } - - /** - * Constructs an empty array with a capacity of ten. - */ - public BytesRefArrayWritable() { - this(10); - } - - /** - * Returns the number of valid elements. - * - * @return the number of valid elements - */ - public int size() { - return valid; - } - - /** - * Gets the BytesRefWritable at the specified position. Make sure the position - * is valid by first call resetValid. - * - * @param index - * the position index, starting from zero - * @throws IndexOutOfBoundsException - */ - public BytesRefWritable get(int index) { - if (index >= valid) { - throw new IndexOutOfBoundsException( - "This BytesRefArrayWritable only has " + valid + " valid values."); - } - return bytesRefWritables[index]; - } - - /** - * Gets the BytesRefWritable at the specified position without checking. - * - * @param index - * the position index, starting from zero - * @throws IndexOutOfBoundsException - */ - public BytesRefWritable unCheckedGet(int index) { - return bytesRefWritables[index]; - } - - /** - * Set the BytesRefWritable at the specified position with the specified - * BytesRefWritable. - * - * @param index - * index position - * @param bytesRefWritable - * the new element - * @throws IllegalArgumentException - * if the specified new element is null - */ - public void set(int index, BytesRefWritable bytesRefWritable) { - if (bytesRefWritable == null) { - throw new IllegalArgumentException("Can not assign null."); - } - ensureCapacity(index + 1); - bytesRefWritables[index] = bytesRefWritable; - if (valid <= index) { - valid = index + 1; - } - } - - /** - * {@inheritDoc} - */ - @Override - public int compareTo(BytesRefArrayWritable other) { - if (other == null) { - throw new IllegalArgumentException("Argument can not be null."); - } - if (this == other) { - return 0; - } - int sizeDiff = valid - other.valid; - if (sizeDiff != 0) { - return sizeDiff; - } - for (int i = 0; i < valid; i++) { - if (other.contains(bytesRefWritables[i])) { - continue; - } else { - return 1; - } - } - return 0; - } - - @Override - public int hashCode(){ - return Objects.hashCode(bytesRefWritables); - } - /** - * Returns <tt>true</tt> if this instance contains one or more the specified - * BytesRefWritable. - * - * @param bytesRefWritable - * BytesRefWritable element to be tested - * @return <tt>true</tt> if contains the specified element - * @throws IllegalArgumentException - * if the specified element is null - */ - public boolean contains(BytesRefWritable bytesRefWritable) { - if (bytesRefWritable == null) { - throw new IllegalArgumentException("Argument can not be null."); - } - for (int i = 0; i < valid; i++) { - if (bytesRefWritables[i].equals(bytesRefWritable)) { - return true; - } - } - return false; - } - - /** - * {@inheritDoc} - */ - @Override - public boolean equals(Object o) { - if (o == null || !(o instanceof BytesRefArrayWritable)) { - return false; - } - return compareTo((BytesRefArrayWritable) o) == 0; - } - - /** - * Removes all elements. - */ - public void clear() { - valid = 0; - } - - /** - * enlarge the capacity if necessary, to ensure that it can hold the number of - * elements specified by newValidCapacity argument. It will also narrow the - * valid capacity when needed. Notice: it only enlarge or narrow the valid - * capacity with no care of the already stored invalid BytesRefWritable. - * - * @param newValidCapacity - * the desired capacity - */ - public void resetValid(int newValidCapacity) { - ensureCapacity(newValidCapacity); - valid = newValidCapacity; - } - - protected void ensureCapacity(int newCapacity) { - int size = bytesRefWritables.length; - if (size < newCapacity) { - bytesRefWritables = Arrays.copyOf(bytesRefWritables, newCapacity); - while (size < newCapacity) { - bytesRefWritables[size] = new BytesRefWritable(); - size++; - } - } - } - - /** - * {@inheritDoc} - */ - @Override - public void readFields(DataInput in) throws IOException { - int count = in.readInt(); - ensureCapacity(count); - for (int i = 0; i < count; i++) { - bytesRefWritables[i].readFields(in); - } - valid = count; - } - - /** - * {@inheritDoc} - */ - @Override - public void write(DataOutput out) throws IOException { - out.writeInt(valid); - - for (int i = 0; i < valid; i++) { - BytesRefWritable cu = bytesRefWritables[i]; - cu.write(out); - } - } - - static { - WritableFactories.setFactory(BytesRefArrayWritable.class, - new WritableFactory() { - - @Override - public Writable newInstance() { - return new BytesRefArrayWritable(); - } - - }); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/BytesRefWritable.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/BytesRefWritable.java b/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/BytesRefWritable.java deleted file mode 100644 index c83b505..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/BytesRefWritable.java +++ /dev/null @@ -1,248 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.rcfile; - -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparator; -import org.apache.hadoop.io.WritableFactories; -import org.apache.hadoop.io.WritableFactory; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; - -/** - * <tt>BytesRefWritable</tt> referenced a section of byte array. It can be used - * to avoid unnecessary byte copy. - */ -public class BytesRefWritable implements Writable, Comparable<BytesRefWritable> { - - private static final byte[] EMPTY_BYTES = new byte[0]; - public static final BytesRefWritable ZeroBytesRefWritable = new BytesRefWritable(); - - int start = 0; - int length = 0; - byte[] bytes = null; - - LazyDecompressionCallback lazyDecompressObj; - - /** - * Create a zero-size bytes. - */ - public BytesRefWritable() { - this(EMPTY_BYTES); - } - - /** - * Create a BytesRefWritable with <tt>length</tt> bytes. - */ - public BytesRefWritable(int length) { - assert length > 0; - this.length = length; - bytes = new byte[this.length]; - start = 0; - } - - /** - * Create a BytesRefWritable referenced to the given bytes. - */ - public BytesRefWritable(byte[] bytes) { - this.bytes = bytes; - length = bytes.length; - start = 0; - } - - /** - * Create a BytesRefWritable referenced to one section of the given bytes. The - * section is determined by argument <tt>offset</tt> and <tt>len</tt>. - */ - public BytesRefWritable(byte[] data, int offset, int len) { - bytes = data; - start = offset; - length = len; - } - - /** - * Create a BytesRefWritable referenced to one section of the given bytes. The - * argument <tt>lazyDecompressData</tt> refers to a LazyDecompressionCallback - * object. The arguments <tt>offset</tt> and <tt>len</tt> are referred to - * uncompressed bytes of <tt>lazyDecompressData</tt>. Use <tt>offset</tt> and - * <tt>len</tt> after uncompressing the data. - */ - public BytesRefWritable(LazyDecompressionCallback lazyDecompressData, - int offset, int len) { - lazyDecompressObj = lazyDecompressData; - start = offset; - length = len; - } - - private void lazyDecompress() throws IOException { - if (bytes == null && lazyDecompressObj != null) { - bytes = lazyDecompressObj.decompress(); - } - } - - /** - * Returns a copy of the underlying bytes referenced by this instance. - * - * @return a new copied byte array - * @throws IOException - */ - public byte[] getBytesCopy() throws IOException { - lazyDecompress(); - byte[] bb = new byte[length]; - System.arraycopy(bytes, start, bb, 0, length); - return bb; - } - - /** - * Returns the underlying bytes. - * - * @throws IOException - */ - public byte[] getData() throws IOException { - lazyDecompress(); - return bytes; - } - - /** - * readFields() will corrupt the array. So use the set method whenever - * possible. - * - * @see #readFields(DataInput) - */ - public void set(byte[] newData, int offset, int len) { - bytes = newData; - start = offset; - length = len; - lazyDecompressObj = null; - } - - /** - * readFields() will corrupt the array. So use the set method whenever - * possible. - * - * @see #readFields(DataInput) - */ - public void set(LazyDecompressionCallback newData, int offset, int len) { - bytes = null; - start = offset; - length = len; - lazyDecompressObj = newData; - } - - public void writeDataTo(DataOutput out) throws IOException { - lazyDecompress(); - out.write(bytes, start, length); - } - - /** - * Always reuse the bytes array if length of bytes array is equal or greater - * to the current record, otherwise create a new one. readFields will corrupt - * the array. Please use set() whenever possible. - * - * @see #set(byte[], int, int) - */ - public void readFields(DataInput in) throws IOException { - int len = in.readInt(); - if (len > bytes.length) { - bytes = new byte[len]; - } - start = 0; - length = len; - in.readFully(bytes, start, length); - } - - /** {@inheritDoc} */ - public void write(DataOutput out) throws IOException { - lazyDecompress(); - out.writeInt(length); - out.write(bytes, start, length); - } - - /** {@inheritDoc} */ - @Override - public int hashCode() { - return super.hashCode(); - } - - /** {@inheritDoc} */ - @Override - public String toString() { - StringBuilder sb = new StringBuilder(3 * length); - for (int idx = start; idx < length; idx++) { - // if not the first, put a blank separator in - if (idx != 0) { - sb.append(' '); - } - String num = Integer.toHexString(0xff & bytes[idx]); - // if it is only one digit, add a leading 0. - if (num.length() < 2) { - sb.append('0'); - } - sb.append(num); - } - return sb.toString(); - } - - /** {@inheritDoc} */ - @Override - public int compareTo(BytesRefWritable other) { - if (other == null) { - throw new IllegalArgumentException("Argument can not be null."); - } - if (this == other) { - return 0; - } - try { - return WritableComparator.compareBytes(getData(), start, getLength(), - other.getData(), other.start, other.getLength()); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - /** {@inheritDoc} */ - @Override - public boolean equals(Object right_obj) { - if (right_obj == null || !(right_obj instanceof BytesRefWritable)) { - return false; - } - return compareTo((BytesRefWritable) right_obj) == 0; - } - - static { - WritableFactories.setFactory(BytesRefWritable.class, new WritableFactory() { - - @Override - public Writable newInstance() { - return new BytesRefWritable(); - } - - }); - } - - public int getLength() { - return length; - } - - public int getStart() { - return start; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/ColumnProjectionUtils.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/ColumnProjectionUtils.java b/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/ColumnProjectionUtils.java deleted file mode 100644 index 352776f..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/ColumnProjectionUtils.java +++ /dev/null @@ -1,117 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.rcfile; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.util.StringUtils; - -import java.util.ArrayList; - -/** - * ColumnProjectionUtils. - * - */ -public final class ColumnProjectionUtils { - - public static final String READ_COLUMN_IDS_CONF_STR = "hive.io.file.readcolumn.ids"; - - /** - * Sets read columns' ids(start from zero) for RCFile's Reader. Once a column - * is included in the list, RCFile's reader will not skip its value. - * - */ - public static void setReadColumnIDs(Configuration conf, ArrayList<Integer> ids) { - String id = toReadColumnIDString(ids); - setReadColumnIDConf(conf, id); - } - - /** - * Sets read columns' ids(start from zero) for RCFile's Reader. Once a column - * is included in the list, RCFile's reader will not skip its value. - * - */ - public static void appendReadColumnIDs(Configuration conf, - ArrayList<Integer> ids) { - String id = toReadColumnIDString(ids); - if (id != null) { - String old = conf.get(READ_COLUMN_IDS_CONF_STR, null); - String newConfStr = id; - if (old != null) { - newConfStr = newConfStr + StringUtils.COMMA_STR + old; - } - - setReadColumnIDConf(conf, newConfStr); - } - } - - private static void setReadColumnIDConf(Configuration conf, String id) { - if (id == null || id.length() <= 0) { - conf.set(READ_COLUMN_IDS_CONF_STR, ""); - return; - } - - conf.set(READ_COLUMN_IDS_CONF_STR, id); - } - - private static String toReadColumnIDString(ArrayList<Integer> ids) { - String id = null; - if (ids != null) { - for (int i = 0; i < ids.size(); i++) { - if (i == 0) { - id = "" + ids.get(i); - } else { - id = id + StringUtils.COMMA_STR + ids.get(i); - } - } - } - return id; - } - - /** - * Returns an array of column ids(start from zero) which is set in the given - * parameter <tt>conf</tt>. - */ - public static ArrayList<Integer> getReadColumnIDs(Configuration conf) { - if (conf == null) { - return new ArrayList<Integer>(0); - } - String skips = conf.get(READ_COLUMN_IDS_CONF_STR, ""); - String[] list = StringUtils.split(skips); - ArrayList<Integer> result = new ArrayList<Integer>(list.length); - for (String element : list) { - // it may contain duplicates, remove duplicates - Integer toAdd = Integer.parseInt(element); - if (!result.contains(toAdd)) { - result.add(toAdd); - } - } - return result; - } - - /** - * Clears the read column ids set in the conf, and will read all columns. - */ - public static void setFullyReadColumns(Configuration conf) { - conf.set(READ_COLUMN_IDS_CONF_STR, ""); - } - - private ColumnProjectionUtils() { - // prevent instantiation - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/LazyDecompressionCallback.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/LazyDecompressionCallback.java b/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/LazyDecompressionCallback.java deleted file mode 100644 index 707d55a..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/LazyDecompressionCallback.java +++ /dev/null @@ -1,32 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.rcfile; - -import java.io.IOException; - -/** - * Used to call back lazy decompression process. - * - * @see BytesRefWritable - */ -public interface LazyDecompressionCallback { - - byte[] decompress() throws IOException; - -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/NonSyncByteArrayInputStream.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/NonSyncByteArrayInputStream.java b/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/NonSyncByteArrayInputStream.java deleted file mode 100644 index bb6af22..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/NonSyncByteArrayInputStream.java +++ /dev/null @@ -1,113 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.rcfile; - -import java.io.ByteArrayInputStream; - -/** - * A thread-not-safe version of ByteArrayInputStream, which removes all - * synchronized modifiers. - */ -public class NonSyncByteArrayInputStream extends ByteArrayInputStream { - public NonSyncByteArrayInputStream() { - super(new byte[] {}); - } - - public NonSyncByteArrayInputStream(byte[] bs) { - super(bs); - } - - public NonSyncByteArrayInputStream(byte[] buf, int offset, int length) { - super(buf, offset, length); - } - - public void reset(byte[] input, int start, int length) { - buf = input; - count = start + length; - mark = start; - pos = start; - } - - public int getPosition() { - return pos; - } - - public int getLength() { - return count; - } - - /** - * {@inheritDoc} - */ - @Override - public int read() { - return (pos < count) ? (buf[pos++] & 0xff) : -1; - } - - /** - * {@inheritDoc} - */ - @Override - public int read(byte b[], int off, int len) { - if (b == null) { - throw new NullPointerException(); - } else if (off < 0 || len < 0 || len > b.length - off) { - throw new IndexOutOfBoundsException(); - } - if (pos >= count) { - return -1; - } - if (pos + len > count) { - len = count - pos; - } - if (len <= 0) { - return 0; - } - System.arraycopy(buf, pos, b, off, len); - pos += len; - return len; - } - - /** - * {@inheritDoc} - */ - @Override - public long skip(long n) { - if (pos + n > count) { - n = count - pos; - } - if (n < 0) { - return 0; - } - pos += n; - return n; - } - - /** - * {@inheritDoc} - */ - @Override - public int available() { - return count - pos; - } - - public void seek(int pos) { - this.pos = pos; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/NonSyncByteArrayOutputStream.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/NonSyncByteArrayOutputStream.java b/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/NonSyncByteArrayOutputStream.java deleted file mode 100644 index 53a3dca..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/NonSyncByteArrayOutputStream.java +++ /dev/null @@ -1,144 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.rcfile; - -import java.io.ByteArrayOutputStream; -import java.io.DataInput; -import java.io.IOException; -import java.io.OutputStream; - -/** - * A thread-not-safe version of ByteArrayOutputStream, which removes all - * synchronized modifiers. - */ -public class NonSyncByteArrayOutputStream extends ByteArrayOutputStream { - public NonSyncByteArrayOutputStream(int size) { - super(size); - } - - public NonSyncByteArrayOutputStream() { - super(64 * 1024); - } - - public byte[] getData() { - return buf; - } - - public int getLength() { - return count; - } - - /** - * {@inheritDoc} - */ - @Override - public void reset() { - count = 0; - } - - public void write(DataInput in, int length) throws IOException { - enLargeBuffer(length); - in.readFully(buf, count, length); - count += length; - } - - private byte[] vLongBytes = new byte[9]; - - public int writeVLongToByteArray(byte[] bytes, int offset, long l) { - if (l >= -112 && l <= 127) { - bytes[offset] = (byte) l; - return 1; - } - - int len = -112; - if (l < 0) { - l ^= -1L; // take one's complement' - len = -120; - } - - long tmp = l; - while (tmp != 0) { - tmp = tmp >> 8; - len--; - } - - bytes[offset++] = (byte) len; - len = (len < -120) ? -(len + 120) : -(len + 112); - - for (int idx = len; idx != 0; idx--) { - int shiftbits = (idx - 1) * 8; - bytes[offset++] = (byte) ((l & (0xFFL << shiftbits)) >> shiftbits); - } - return 1 + len; - } - - public int writeVLong(long l) { - int len = writeVLongToByteArray(vLongBytes, 0, l); - write(vLongBytes, 0, len); - return len; - } - - /** - * {@inheritDoc} - */ - @Override - public void write(int b) { - enLargeBuffer(1); - buf[count] = (byte) b; - count += 1; - } - - private int enLargeBuffer(int increment) { - int temp = count + increment; - int newLen = temp; - if (temp > buf.length) { - if ((buf.length << 1) > temp) { - newLen = buf.length << 1; - } - byte newbuf[] = new byte[newLen]; - System.arraycopy(buf, 0, newbuf, 0, count); - buf = newbuf; - } - return newLen; - } - - /** - * {@inheritDoc} - */ - @Override - public void write(byte b[], int off, int len) { - if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) - || ((off + len) < 0)) { - throw new IndexOutOfBoundsException(); - } else if (len == 0) { - return; - } - enLargeBuffer(len); - System.arraycopy(b, off, buf, count, len); - count += len; - } - - /** - * {@inheritDoc} - */ - @Override - public void writeTo(OutputStream out) throws IOException { - out.write(buf, 0, count); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/NonSyncDataInputBuffer.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/NonSyncDataInputBuffer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/NonSyncDataInputBuffer.java deleted file mode 100644 index 46745ab..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/NonSyncDataInputBuffer.java +++ /dev/null @@ -1,507 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.rcfile; - -import org.apache.hadoop.fs.Seekable; - -import java.io.*; - -/** - * A thread-not-safe version of Hadoop's DataInputBuffer, which removes all - * synchronized modifiers. - */ -public class NonSyncDataInputBuffer extends FilterInputStream implements - DataInput, Seekable { - - private final NonSyncByteArrayInputStream buffer; - - byte[] buff = new byte[16]; - - /** Constructs a new empty buffer. */ - public NonSyncDataInputBuffer() { - this(new NonSyncByteArrayInputStream()); - } - - private NonSyncDataInputBuffer(NonSyncByteArrayInputStream buffer) { - super(buffer); - this.buffer = buffer; - } - - /** Resets the data that the buffer reads. */ - public void reset(byte[] input, int length) { - buffer.reset(input, 0, length); - } - - /** Resets the data that the buffer reads. */ - public void reset(byte[] input, int start, int length) { - buffer.reset(input, start, length); - } - - /** Returns the current position in the input. */ - public int getPosition() { - return buffer.getPosition(); - } - - /** Returns the length of the input. */ - public int getLength() { - return buffer.getLength(); - } - - /** - * Reads bytes from the source stream into the byte array <code>buffer</code>. - * The number of bytes actually read is returned. - * - * @param buffer - * the buffer to read bytes into - * @return the number of bytes actually read or -1 if end of stream. - * - * @throws java.io.IOException - * If a problem occurs reading from this DataInputStream. - * - */ - @Override - public final int read(byte[] buffer) throws IOException { - return in.read(buffer, 0, buffer.length); - } - - /** - * Read at most <code>length</code> bytes from this DataInputStream and stores - * them in byte array <code>buffer</code> starting at <code>offset</code>. - * Answer the number of bytes actually read or -1 if no bytes were read and - * end of stream was encountered. - * - * @param buffer - * the byte array in which to store the read bytes. - * @param offset - * the offset in <code>buffer</code> to store the read bytes. - * @param length - * the maximum number of bytes to store in <code>buffer</code>. - * @return the number of bytes actually read or -1 if end of stream. - * - * @throws java.io.IOException - * If a problem occurs reading from this DataInputStream. - * - */ - @Deprecated - @Override - public final int read(byte[] buffer, int offset, int length) - throws IOException { - return in.read(buffer, offset, length); - } - - /** - * Reads a boolean from this stream. - * - * @return the next boolean value from the source stream. - * - * @throws java.io.IOException - * If a problem occurs reading from this DataInputStream. - * - */ - public final boolean readBoolean() throws IOException { - int temp = in.read(); - if (temp < 0) { - throw new EOFException(); - } - return temp != 0; - } - - /** - * Reads an 8-bit byte value from this stream. - * - * @return the next byte value from the source stream. - * - * @throws java.io.IOException - * If a problem occurs reading from this DataInputStream. - * - */ - public final byte readByte() throws IOException { - int temp = in.read(); - if (temp < 0) { - throw new EOFException(); - } - return (byte) temp; - } - - /** - * Reads a 16-bit character value from this stream. - * - * @return the next <code>char</code> value from the source stream. - * - * @throws java.io.IOException - * If a problem occurs reading from this DataInputStream. - * - */ - private int readToBuff(int count) throws IOException { - int offset = 0; - - while (offset < count) { - int bytesRead = in.read(buff, offset, count - offset); - if (bytesRead == -1) { - return bytesRead; - } - offset += bytesRead; - } - return offset; - } - - public final char readChar() throws IOException { - if (readToBuff(2) < 0) { - throw new EOFException(); - } - return (char) (((buff[0] & 0xff) << 8) | (buff[1] & 0xff)); - - } - - /** - * Reads a 64-bit <code>double</code> value from this stream. - * - * @return the next <code>double</code> value from the source stream. - * - * @throws java.io.IOException - * If a problem occurs reading from this DataInputStream. - * - */ - public final double readDouble() throws IOException { - return Double.longBitsToDouble(readLong()); - } - - /** - * Reads a 32-bit <code>float</code> value from this stream. - * - * @return the next <code>float</code> value from the source stream. - * - * @throws java.io.IOException - * If a problem occurs reading from this DataInputStream. - * - */ - public final float readFloat() throws IOException { - return Float.intBitsToFloat(readInt()); - } - - /** - * Reads bytes from this stream into the byte array <code>buffer</code>. This - * method will block until <code>buffer.length</code> number of bytes have - * been read. - * - * @param buffer - * to read bytes into - * - * @throws java.io.IOException - * If a problem occurs reading from this DataInputStream. - * - */ - public final void readFully(byte[] buffer) throws IOException { - readFully(buffer, 0, buffer.length); - } - - /** - * Reads bytes from this stream and stores them in the byte array - * <code>buffer</code> starting at the position <code>offset</code>. This - * method blocks until <code>count</code> bytes have been read. - * - * @param buffer - * the byte array into which the data is read - * @param offset - * the offset the operation start at - * @param length - * the maximum number of bytes to read - * - * @throws java.io.IOException - * if a problem occurs while reading from this stream - * @throws java.io.EOFException - * if reaches the end of the stream before enough bytes have been - * read - */ - public final void readFully(byte[] buffer, int offset, int length) - throws IOException { - if (length < 0) { - throw new IndexOutOfBoundsException(); - } - if (length == 0) { - return; - } - if (in == null || buffer == null) { - throw new NullPointerException("Null Pointer to underlying input stream"); - } - - if (offset < 0 || offset > buffer.length - length) { - throw new IndexOutOfBoundsException(); - } - while (length > 0) { - int result = in.read(buffer, offset, length); - if (result < 0) { - throw new EOFException(); - } - offset += result; - length -= result; - } - } - - /** - * Reads a 32-bit integer value from this stream. - * - * @return the next <code>int</code> value from the source stream. - * - * @throws java.io.IOException - * If a problem occurs reading from this DataInputStream. - * - */ - public final int readInt() throws IOException { - if (readToBuff(4) < 0) { - throw new EOFException(); - } - return ((buff[0] & 0xff) << 24) | ((buff[1] & 0xff) << 16) - | ((buff[2] & 0xff) << 8) | (buff[3] & 0xff); - } - - /** - * Answers a <code>String</code> representing the next line of text available - * in this BufferedReader. A line is represented by 0 or more characters - * followed by <code>'\n'</code>, <code>'\r'</code>, <code>"\n\r"</code> or - * end of stream. The <code>String</code> does not include the newline - * sequence. - * - * @return the contents of the line or null if no characters were read before - * end of stream. - * - * @throws java.io.IOException - * If the DataInputStream is already closed or some other IO error - * occurs. - * - * @deprecated Use BufferedReader - */ - @Deprecated - public final String readLine() throws IOException { - StringBuilder line = new StringBuilder(80); // Typical line length - boolean foundTerminator = false; - while (true) { - int nextByte = in.read(); - switch (nextByte) { - case -1: - if (line.length() == 0 && !foundTerminator) { - return null; - } - return line.toString(); - case (byte) '\r': - if (foundTerminator) { - ((PushbackInputStream) in).unread(nextByte); - return line.toString(); - } - foundTerminator = true; - /* Have to be able to peek ahead one byte */ - if (!(in.getClass() == PushbackInputStream.class)) { - in = new PushbackInputStream(in); - } - break; - case (byte) '\n': - return line.toString(); - default: - if (foundTerminator) { - ((PushbackInputStream) in).unread(nextByte); - return line.toString(); - } - line.append((char) nextByte); - } - } - } - - /** - * Reads a 64-bit <code>long</code> value from this stream. - * - * @return the next <code>long</code> value from the source stream. - * - * @throws java.io.IOException - * If a problem occurs reading from this DataInputStream. - * - */ - public final long readLong() throws IOException { - if (readToBuff(8) < 0) { - throw new EOFException(); - } - int i1 = ((buff[0] & 0xff) << 24) | ((buff[1] & 0xff) << 16) - | ((buff[2] & 0xff) << 8) | (buff[3] & 0xff); - int i2 = ((buff[4] & 0xff) << 24) | ((buff[5] & 0xff) << 16) - | ((buff[6] & 0xff) << 8) | (buff[7] & 0xff); - - return ((i1 & 0xffffffffL) << 32) | (i2 & 0xffffffffL); - } - - /** - * Reads a 16-bit <code>short</code> value from this stream. - * - * @return the next <code>short</code> value from the source stream. - * - * @throws java.io.IOException - * If a problem occurs reading from this DataInputStream. - * - */ - public final short readShort() throws IOException { - if (readToBuff(2) < 0) { - throw new EOFException(); - } - return (short) (((buff[0] & 0xff) << 8) | (buff[1] & 0xff)); - } - - /** - * Reads an unsigned 8-bit <code>byte</code> value from this stream and - * returns it as an int. - * - * @return the next unsigned byte value from the source stream. - * - * @throws java.io.IOException - * If a problem occurs reading from this DataInputStream. - * - */ - public final int readUnsignedByte() throws IOException { - int temp = in.read(); - if (temp < 0) { - throw new EOFException(); - } - return temp; - } - - /** - * Reads a 16-bit unsigned <code>short</code> value from this stream and - * returns it as an int. - * - * @return the next unsigned <code>short</code> value from the source stream. - * - * @throws java.io.IOException - * If a problem occurs reading from this DataInputStream. - * - */ - public final int readUnsignedShort() throws IOException { - if (readToBuff(2) < 0) { - throw new EOFException(); - } - return (char) (((buff[0] & 0xff) << 8) | (buff[1] & 0xff)); - } - - /** - * Reads a UTF format String from this Stream. - * - * @return the next UTF String from the source stream. - * - * @throws java.io.IOException - * If a problem occurs reading from this DataInputStream. - * - */ - public final String readUTF() throws IOException { - return decodeUTF(readUnsignedShort()); - } - - String decodeUTF(int utfSize) throws IOException { - return decodeUTF(utfSize, this); - } - - private static String decodeUTF(int utfSize, DataInput in) throws IOException { - byte[] buf = new byte[utfSize]; - char[] out = new char[utfSize]; - in.readFully(buf, 0, utfSize); - - return convertUTF8WithBuf(buf, out, 0, utfSize); - } - - /** - * Reads a UTF format String from the DataInput Stream <code>in</code>. - * - * @param in - * the input stream to read from - * @return the next UTF String from the source stream. - * - * @throws java.io.IOException - * If a problem occurs reading from this DataInputStream. - * - */ - public static final String readUTF(DataInput in) throws IOException { - return decodeUTF(in.readUnsignedShort(), in); - } - - /** - * Skips <code>count</code> number of bytes in this stream. Subsequent - * <code>read()</code>'s will not return these bytes unless - * <code>reset()</code> is used. - * - * @param count - * the number of bytes to skip. - * @return the number of bytes actually skipped. - * - * @throws java.io.IOException - * If the stream is already closed or another IOException occurs. - */ - public final int skipBytes(int count) throws IOException { - int skipped = 0; - long skip; - while (skipped < count && (skip = in.skip(count - skipped)) != 0) { - skipped += skip; - } - if (skipped < 0) { - throw new EOFException(); - } - return skipped; - } - - public static String convertUTF8WithBuf(byte[] buf, char[] out, int offset, - int utfSize) throws UTFDataFormatException { - int count = 0, s = 0, a; - while (count < utfSize) { - if ((out[s] = (char) buf[offset + count++]) < '\u0080') { - s++; - } else if (((a = out[s]) & 0xe0) == 0xc0) { - if (count >= utfSize) { - throw new UTFDataFormatException(); - } - int b = buf[count++]; - if ((b & 0xC0) != 0x80) { - throw new UTFDataFormatException(); - } - out[s++] = (char) (((a & 0x1F) << 6) | (b & 0x3F)); - } else if ((a & 0xf0) == 0xe0) { - if (count + 1 >= utfSize) { - throw new UTFDataFormatException(); - } - int b = buf[count++]; - int c = buf[count++]; - if (((b & 0xC0) != 0x80) || ((c & 0xC0) != 0x80)) { - throw new UTFDataFormatException(); - } - out[s++] = (char) (((a & 0x0F) << 12) | ((b & 0x3F) << 6) | (c & 0x3F)); - } else { - throw new UTFDataFormatException(); - } - } - return new String(out, 0, s); - } - - @Override - public void seek(long pos) throws IOException { - buffer.seek((int)pos); - } - - @Override - public long getPos() throws IOException { - return buffer.getPosition(); - } - - @Override - public boolean seekToNewSource(long targetPos) throws IOException { - return false; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/NonSyncDataOutputBuffer.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/NonSyncDataOutputBuffer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/NonSyncDataOutputBuffer.java deleted file mode 100644 index 3944f38..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/rcfile/NonSyncDataOutputBuffer.java +++ /dev/null @@ -1,91 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.rcfile; - -import java.io.DataInput; -import java.io.DataOutputStream; -import java.io.IOException; - -/** - * A thread-not-safe version of Hadoop's DataOutputBuffer, which removes all - * synchronized modifiers. - */ -public class NonSyncDataOutputBuffer extends DataOutputStream { - - private final NonSyncByteArrayOutputStream buffer; - - /** Constructs a new empty buffer. */ - public NonSyncDataOutputBuffer() { - this(new NonSyncByteArrayOutputStream()); - } - - private NonSyncDataOutputBuffer(NonSyncByteArrayOutputStream buffer) { - super(buffer); - this.buffer = buffer; - } - - /** - * Returns the current contents of the buffer. Data is only valid to - * {@link #getLength()}. - */ - public byte[] getData() { - return buffer.getData(); - } - - /** Returns the length of the valid data currently in the buffer. */ - public int getLength() { - return buffer.getLength(); - } - - /** Resets the buffer to empty. */ - public NonSyncDataOutputBuffer reset() { - written = 0; - buffer.reset(); - return this; - } - - /** Writes bytes from a DataInput directly into the buffer. */ - public void write(DataInput in, int length) throws IOException { - buffer.write(in, length); - } - - @Override - public void write(int b) throws IOException { - buffer.write(b); - incCount(1); - } - - @Override - public void write(byte b[], int off, int len) throws IOException { - buffer.write(b, off, len); - incCount(len); - } - - public void writeTo(DataOutputStream out) throws IOException { - buffer.writeTo(out); - } - - private void incCount(int value) { - if (written + value < 0) { - written = Integer.MAX_VALUE; - } else { - written += value; - } - } -}
