http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroScanner.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroScanner.java b/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroScanner.java deleted file mode 100644 index 72472fc..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroScanner.java +++ /dev/null @@ -1,287 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.avro; - -import com.google.protobuf.InvalidProtocolBufferException; -import com.google.protobuf.Message; -import org.apache.avro.Schema; -import org.apache.avro.file.DataFileReader; -import org.apache.avro.file.SeekableInput; -import org.apache.avro.generic.GenericDatumReader; -import org.apache.avro.generic.GenericFixed; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.io.DatumReader; -import org.apache.avro.mapred.FsInput; -import org.apache.avro.util.Utf8; -import org.apache.hadoop.conf.Configuration; -import org.apache.tajo.catalog.Column; -import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.common.TajoDataTypes; -import org.apache.tajo.common.TajoDataTypes.DataType; -import org.apache.tajo.datum.*; -import org.apache.tajo.storage.FileScanner; -import org.apache.tajo.storage.Tuple; -import org.apache.tajo.storage.VTuple; -import org.apache.tajo.storage.fragment.FileFragment; -import org.apache.tajo.storage.fragment.Fragment; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.List; - -/** - * FileScanner for reading Avro files - */ -public class AvroScanner extends FileScanner { - private Schema avroSchema; - private List<Schema.Field> avroFields; - private DataFileReader<GenericRecord> dataFileReader; - private int[] projectionMap; - - /** - * Creates a new AvroScanner. - * - * @param conf - * @param schema - * @param meta - * @param fragment - */ - public AvroScanner(Configuration conf, - final org.apache.tajo.catalog.Schema schema, - final TableMeta meta, final Fragment fragment) { - super(conf, schema, meta, fragment); - } - - /** - * Initializes the AvroScanner. - */ - @Override - public void init() throws IOException { - if (targets == null) { - targets = schema.toArray(); - } - prepareProjection(targets); - - avroSchema = AvroUtil.getAvroSchema(meta, conf); - avroFields = avroSchema.getFields(); - - DatumReader<GenericRecord> datumReader = - new GenericDatumReader<GenericRecord>(avroSchema); - SeekableInput input = new FsInput(fragment.getPath(), conf); - dataFileReader = new DataFileReader<GenericRecord>(input, datumReader); - super.init(); - } - - private void prepareProjection(Column[] targets) { - projectionMap = new int[targets.length]; - for (int i = 0; i < targets.length; ++i) { - projectionMap[i] = schema.getColumnId(targets[i].getQualifiedName()); - } - } - - private static String fromAvroString(Object value) { - if (value instanceof Utf8) { - Utf8 utf8 = (Utf8)value; - return utf8.toString(); - } - return value.toString(); - } - - private static Schema getNonNull(Schema schema) { - if (!schema.getType().equals(Schema.Type.UNION)) { - return schema; - } - List<Schema> schemas = schema.getTypes(); - if (schemas.size() != 2) { - return schema; - } - if (schemas.get(0).getType().equals(Schema.Type.NULL)) { - return schemas.get(1); - } else if (schemas.get(1).getType().equals(Schema.Type.NULL)) { - return schemas.get(0); - } else { - return schema; - } - } - - private Datum convertInt(Object value, TajoDataTypes.Type tajoType) { - int intValue = (Integer)value; - switch (tajoType) { - case BIT: - return DatumFactory.createBit((byte)(intValue & 0xff)); - case INT2: - return DatumFactory.createInt2((short)intValue); - default: - return DatumFactory.createInt4(intValue); - } - } - - private Datum convertBytes(Object value, TajoDataTypes.Type tajoType, - DataType dataType) { - ByteBuffer buffer = (ByteBuffer)value; - byte[] bytes = new byte[buffer.capacity()]; - buffer.get(bytes, 0, bytes.length); - switch (tajoType) { - case INET4: - return DatumFactory.createInet4(bytes); - case PROTOBUF: - try { - ProtobufDatumFactory factory = - ProtobufDatumFactory.get(dataType.getCode()); - Message.Builder builder = factory.newBuilder(); - builder.mergeFrom(bytes); - return factory.createDatum(builder); - } catch (InvalidProtocolBufferException e) { - throw new RuntimeException(e); - } - default: - return new BlobDatum(bytes); - } - } - - private Datum convertString(Object value, TajoDataTypes.Type tajoType) { - switch (tajoType) { - case CHAR: - return DatumFactory.createChar(fromAvroString(value)); - default: - return DatumFactory.createText(fromAvroString(value)); - } - } - - /** - * Reads the next Tuple from the Avro file. - * - * @return The next Tuple from the Avro file or null if end of file is - * reached. - */ - @Override - public Tuple next() throws IOException { - if (!dataFileReader.hasNext()) { - return null; - } - - Tuple tuple = new VTuple(schema.size()); - GenericRecord record = dataFileReader.next(); - for (int i = 0; i < projectionMap.length; ++i) { - int columnIndex = projectionMap[i]; - Object value = record.get(columnIndex); - if (value == null) { - tuple.put(columnIndex, NullDatum.get()); - continue; - } - - // Get Avro type. - Schema.Field avroField = avroFields.get(columnIndex); - Schema nonNullAvroSchema = getNonNull(avroField.schema()); - Schema.Type avroType = nonNullAvroSchema.getType(); - - // Get Tajo type. - Column column = schema.getColumn(columnIndex); - DataType dataType = column.getDataType(); - TajoDataTypes.Type tajoType = dataType.getType(); - switch (avroType) { - case NULL: - tuple.put(columnIndex, NullDatum.get()); - break; - case BOOLEAN: - tuple.put(columnIndex, DatumFactory.createBool((Boolean)value)); - break; - case INT: - tuple.put(columnIndex, convertInt(value, tajoType)); - break; - case LONG: - tuple.put(columnIndex, DatumFactory.createInt8((Long)value)); - break; - case FLOAT: - tuple.put(columnIndex, DatumFactory.createFloat4((Float)value)); - break; - case DOUBLE: - tuple.put(columnIndex, DatumFactory.createFloat8((Double)value)); - break; - case BYTES: - tuple.put(columnIndex, convertBytes(value, tajoType, dataType)); - break; - case STRING: - tuple.put(columnIndex, convertString(value, tajoType)); - break; - case RECORD: - throw new RuntimeException("Avro RECORD not supported."); - case ENUM: - throw new RuntimeException("Avro ENUM not supported."); - case MAP: - throw new RuntimeException("Avro MAP not supported."); - case UNION: - throw new RuntimeException("Avro UNION not supported."); - case FIXED: - tuple.put(columnIndex, new BlobDatum(((GenericFixed)value).bytes())); - break; - default: - throw new RuntimeException("Unknown type."); - } - } - return tuple; - } - - /** - * Resets the scanner - */ - @Override - public void reset() throws IOException { - } - - /** - * Closes the scanner. - */ - @Override - public void close() throws IOException { - if (dataFileReader != null) { - dataFileReader.close(); - } - } - - /** - * Returns whether this scanner is projectable. - * - * @return true - */ - @Override - public boolean isProjectable() { - return true; - } - - /** - * Returns whether this scanner is selectable. - * - * @return false - */ - @Override - public boolean isSelectable() { - return false; - } - - /** - * Returns whether this scanner is splittable. - * - * @return false - */ - @Override - public boolean isSplittable() { - return false; - } -}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroUtil.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroUtil.java b/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroUtil.java deleted file mode 100644 index 0d14c3d..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroUtil.java +++ /dev/null @@ -1,77 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.avro; - -import org.apache.avro.Schema; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.IOUtils; -import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.storage.StorageConstants; - -import java.io.IOException; -import java.io.InputStream; -import java.net.URL; - -public class AvroUtil { - public static Schema getAvroSchema(TableMeta meta, Configuration conf) - throws IOException { - - boolean isSchemaLiteral = meta.containsOption(StorageConstants.AVRO_SCHEMA_LITERAL); - boolean isSchemaUrl = meta.containsOption(StorageConstants.AVRO_SCHEMA_URL); - if (!isSchemaLiteral && !isSchemaUrl) { - throw new RuntimeException("No Avro schema for table."); - } - if (isSchemaLiteral) { - String schema = meta.getOption(StorageConstants.AVRO_SCHEMA_LITERAL); - return new Schema.Parser().parse(schema); - } - - String schemaURL = meta.getOption(StorageConstants.AVRO_SCHEMA_URL); - if (schemaURL.toLowerCase().startsWith("http")) { - return getAvroSchemaFromHttp(schemaURL); - } else { - return getAvroSchemaFromFileSystem(schemaURL, conf); - } - } - - public static Schema getAvroSchemaFromHttp(String schemaURL) throws IOException { - InputStream inputStream = new URL(schemaURL).openStream(); - - try { - return new Schema.Parser().parse(inputStream); - } finally { - IOUtils.closeStream(inputStream); - } - } - - public static Schema getAvroSchemaFromFileSystem(String schemaURL, Configuration conf) throws IOException { - Path schemaPath = new Path(schemaURL); - FileSystem fs = schemaPath.getFileSystem(conf); - FSDataInputStream inputStream = fs.open(schemaPath); - - try { - return new Schema.Parser().parse(inputStream); - } finally { - IOUtils.closeStream(inputStream); - } - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/avro/package-info.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/avro/package-info.java b/tajo-storage/src/main/java/org/apache/tajo/storage/avro/package-info.java deleted file mode 100644 index 40d1545..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/avro/package-info.java +++ /dev/null @@ -1,85 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * <p> - * Provides read and write support for Avro files. Avro schemas are - * converted to Tajo schemas according to the following mapping of Avro - * and Tajo types: - * </p> - * - * <table> - * <tr> - * <th>Avro type</th> - * <th>Tajo type</th> - * </tr> - * <tr> - * <td>NULL</td> - * <td>NULL_TYPE</td> - * </tr> - * <tr> - * <td>BOOLEAN</td> - * <td>BOOLEAN</td> - * </tr> - * <tr> - * <td>INT</td> - * <td>INT4</td> - * </tr> - * <tr> - * <td>LONG</td> - * <td>INT8</td> - * </tr> - * <tr> - * <td>FLOAT</td> - * <td>FLOAT4</td> - * </tr> - * <tr> - * <td>DOUBLE</td> - * <td>FLOAT8</td> - * </tr> - * <tr> - * <td>BYTES</td> - * <td>BLOB</td> - * </tr> - * <tr> - * <td>STRING</td> - * <td>TEXT</td> - * </tr> - * <tr> - * <td>FIXED</td> - * <td>BLOB</td> - * </tr> - * <tr> - * <td>RECORD</td> - * <td>Not currently supported</td> - * </tr> - * <tr> - * <td>ENUM</td> - * <td>Not currently supported.</td> - * </tr> - * <tr> - * <td>MAP</td> - * <td>Not currently supported.</td> - * </tr> - * <tr> - * <td>UNION</td> - * <td>Not currently supported.</td> - * </tr> - * </table> - */ - -package org.apache.tajo.storage.avro; http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/compress/CodecPool.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/compress/CodecPool.java b/tajo-storage/src/main/java/org/apache/tajo/storage/compress/CodecPool.java deleted file mode 100644 index baeda8c..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/compress/CodecPool.java +++ /dev/null @@ -1,185 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.compress; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.compress.CompressionCodec; -import org.apache.hadoop.io.compress.Compressor; -import org.apache.hadoop.io.compress.Decompressor; -import org.apache.hadoop.io.compress.DoNotPool; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** - * A global compressor/decompressor pool used to save and reuse (possibly - * native) compression/decompression codecs. - */ -public final class CodecPool { - private static final Log LOG = LogFactory.getLog(CodecPool.class); - - /** - * A global compressor pool used to save the expensive - * construction/destruction of (possibly native) decompression codecs. - */ - private static final Map<Class<Compressor>, List<Compressor>> COMPRESSOR_POOL = - new HashMap<Class<Compressor>, List<Compressor>>(); - - /** - * A global decompressor pool used to save the expensive - * construction/destruction of (possibly native) decompression codecs. - */ - private static final Map<Class<Decompressor>, List<Decompressor>> DECOMPRESSOR_POOL = - new HashMap<Class<Decompressor>, List<Decompressor>>(); - - private static <T> T borrow(Map<Class<T>, List<T>> pool, - Class<? extends T> codecClass) { - T codec = null; - - // Check if an appropriate codec is available - synchronized (pool) { - if (pool.containsKey(codecClass)) { - List<T> codecList = pool.get(codecClass); - - if (codecList != null) { - synchronized (codecList) { - if (!codecList.isEmpty()) { - codec = codecList.remove(codecList.size() - 1); - } - } - } - } - } - - return codec; - } - - private static <T> void payback(Map<Class<T>, List<T>> pool, T codec) { - if (codec != null) { - Class<T> codecClass = (Class<T>) codec.getClass(); - synchronized (pool) { - if (!pool.containsKey(codecClass)) { - pool.put(codecClass, new ArrayList<T>()); - } - - List<T> codecList = pool.get(codecClass); - synchronized (codecList) { - codecList.add(codec); - } - } - } - } - - /** - * Get a {@link Compressor} for the given {@link CompressionCodec} from the - * pool or a new one. - * - * @param codec - * the <code>CompressionCodec</code> for which to get the - * <code>Compressor</code> - * @param conf the <code>Configuration</code> object which contains confs for creating or reinit the compressor - * @return <code>Compressor</code> for the given <code>CompressionCodec</code> - * from the pool or a new one - */ - public static Compressor getCompressor(CompressionCodec codec, Configuration conf) { - Compressor compressor = borrow(COMPRESSOR_POOL, codec.getCompressorType()); - if (compressor == null) { - compressor = codec.createCompressor(); - LOG.info("Got brand-new compressor ["+codec.getDefaultExtension()+"]"); - } else { - compressor.reinit(conf); - if(LOG.isDebugEnabled()) { - LOG.debug("Got recycled compressor"); - } - } - return compressor; - } - - public static Compressor getCompressor(CompressionCodec codec) { - return getCompressor(codec, null); - } - - /** - * Get a {@link Decompressor} for the given {@link CompressionCodec} from the - * pool or a new one. - * - * @param codec - * the <code>CompressionCodec</code> for which to get the - * <code>Decompressor</code> - * @return <code>Decompressor</code> for the given - * <code>CompressionCodec</code> the pool or a new one - */ - public static Decompressor getDecompressor(CompressionCodec codec) { - Decompressor decompressor = borrow(DECOMPRESSOR_POOL, codec - .getDecompressorType()); - if (decompressor == null) { - decompressor = codec.createDecompressor(); - LOG.info("Got brand-new decompressor ["+codec.getDefaultExtension()+"]"); - } else { - if(LOG.isDebugEnabled()) { - LOG.debug("Got recycled decompressor"); - } - } - return decompressor; - } - - /** - * Return the {@link Compressor} to the pool. - * - * @param compressor - * the <code>Compressor</code> to be returned to the pool - */ - public static void returnCompressor(Compressor compressor) { - if (compressor == null) { - return; - } - // if the compressor can't be reused, don't pool it. - if (compressor.getClass().isAnnotationPresent(DoNotPool.class)) { - return; - } - compressor.reset(); - payback(COMPRESSOR_POOL, compressor); - } - - /** - * Return the {@link Decompressor} to the pool. - * - * @param decompressor - * the <code>Decompressor</code> to be returned to the pool - */ - public static void returnDecompressor(Decompressor decompressor) { - if (decompressor == null) { - return; - } - // if the decompressor can't be reused, don't pool it. - if (decompressor.getClass().isAnnotationPresent(DoNotPool.class)) { - return; - } - decompressor.reset(); - payback(DECOMPRESSOR_POOL, decompressor); - } - - private CodecPool() { - // prevent instantiation - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/exception/AlreadyExistsStorageException.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/exception/AlreadyExistsStorageException.java b/tajo-storage/src/main/java/org/apache/tajo/storage/exception/AlreadyExistsStorageException.java deleted file mode 100644 index bb035a8..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/exception/AlreadyExistsStorageException.java +++ /dev/null @@ -1,39 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * - */ -package org.apache.tajo.storage.exception; - -import org.apache.hadoop.fs.Path; - -import java.io.IOException; - -public class AlreadyExistsStorageException extends IOException { - private static final long serialVersionUID = 965518916144019032L; - - - public AlreadyExistsStorageException(String path) { - super("Error: "+path+" alreay exists"); - } - - public AlreadyExistsStorageException(Path path) { - this(path.toString()); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/exception/UnknownCodecException.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/exception/UnknownCodecException.java b/tajo-storage/src/main/java/org/apache/tajo/storage/exception/UnknownCodecException.java deleted file mode 100644 index a67d1f7..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/exception/UnknownCodecException.java +++ /dev/null @@ -1,32 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.exception; - -public class UnknownCodecException extends Exception { - - private static final long serialVersionUID = 4287230843540404529L; - - public UnknownCodecException() { - - } - - public UnknownCodecException(String message) { - super(message); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/exception/UnknownDataTypeException.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/exception/UnknownDataTypeException.java b/tajo-storage/src/main/java/org/apache/tajo/storage/exception/UnknownDataTypeException.java deleted file mode 100644 index d18b5a0..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/exception/UnknownDataTypeException.java +++ /dev/null @@ -1,32 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.exception; - -public class UnknownDataTypeException extends Exception { - - private static final long serialVersionUID = -2630390595968966164L; - - public UnknownDataTypeException() { - - } - - public UnknownDataTypeException(String message) { - super(message); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/exception/UnsupportedFileTypeException.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/exception/UnsupportedFileTypeException.java b/tajo-storage/src/main/java/org/apache/tajo/storage/exception/UnsupportedFileTypeException.java deleted file mode 100644 index 8b197d6..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/exception/UnsupportedFileTypeException.java +++ /dev/null @@ -1,36 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * - */ -package org.apache.tajo.storage.exception; - -public class UnsupportedFileTypeException extends RuntimeException { - private static final long serialVersionUID = -8160289695849000342L; - - public UnsupportedFileTypeException() { - } - - /** - * @param message - */ - public UnsupportedFileTypeException(String message) { - super(message); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java b/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java deleted file mode 100644 index 4a83dbf..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java +++ /dev/null @@ -1,237 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.fragment; - -import com.google.common.base.Objects; -import com.google.gson.annotations.Expose; -import com.google.protobuf.ByteString; -import com.google.protobuf.InvalidProtocolBufferException; -import org.apache.hadoop.fs.BlockLocation; -import org.apache.hadoop.fs.Path; -import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; -import org.apache.tajo.util.TUtil; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; - -import static org.apache.tajo.catalog.proto.CatalogProtos.FileFragmentProto; -import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto; - -public class FileFragment implements Fragment, Comparable<FileFragment>, Cloneable { - @Expose private String tableName; // required - @Expose private Path uri; // required - @Expose public Long startOffset; // required - @Expose public Long length; // required - - private String[] hosts; // Datanode hostnames - @Expose private int[] diskIds; - - public FileFragment(ByteString raw) throws InvalidProtocolBufferException { - FileFragmentProto.Builder builder = FileFragmentProto.newBuilder(); - builder.mergeFrom(raw); - builder.build(); - init(builder.build()); - } - - public FileFragment(String tableName, Path uri, BlockLocation blockLocation) - throws IOException { - this.set(tableName, uri, blockLocation.getOffset(), blockLocation.getLength(), blockLocation.getHosts(), null); - } - - public FileFragment(String tableName, Path uri, long start, long length, String[] hosts, int[] diskIds) { - this.set(tableName, uri, start, length, hosts, diskIds); - } - // Non splittable - public FileFragment(String tableName, Path uri, long start, long length, String[] hosts) { - this.set(tableName, uri, start, length, hosts, null); - } - - public FileFragment(String fragmentId, Path path, long start, long length) { - this.set(fragmentId, path, start, length, null, null); - } - - public FileFragment(FileFragmentProto proto) { - init(proto); - } - - private void init(FileFragmentProto proto) { - int[] diskIds = new int[proto.getDiskIdsList().size()]; - int i = 0; - for(Integer eachValue: proto.getDiskIdsList()) { - diskIds[i++] = eachValue; - } - this.set(proto.getId(), new Path(proto.getPath()), - proto.getStartOffset(), proto.getLength(), - proto.getHostsList().toArray(new String[]{}), - diskIds); - } - - private void set(String tableName, Path path, long start, - long length, String[] hosts, int[] diskIds) { - this.tableName = tableName; - this.uri = path; - this.startOffset = start; - this.length = length; - this.hosts = hosts; - this.diskIds = diskIds; - } - - - /** - * Get the list of hosts (hostname) hosting this block - */ - public String[] getHosts() { - if (hosts == null) { - this.hosts = new String[0]; - } - return hosts; - } - - /** - * Get the list of Disk Ids - * Unknown disk is -1. Others 0 ~ N - */ - public int[] getDiskIds() { - if (diskIds == null) { - this.diskIds = new int[getHosts().length]; - Arrays.fill(this.diskIds, -1); - } - return diskIds; - } - - public void setDiskIds(int[] diskIds){ - this.diskIds = diskIds; - } - - @Override - public String getTableName() { - return this.tableName; - } - - public Path getPath() { - return this.uri; - } - - public void setPath(Path path) { - this.uri = path; - } - - public Long getStartKey() { - return this.startOffset; - } - - @Override - public String getKey() { - return this.uri.toString(); - } - - @Override - public long getLength() { - return this.length; - } - - @Override - public boolean isEmpty() { - return this.length <= 0; - } - /** - * - * The offset range of tablets <b>MUST NOT</b> be overlapped. - * - * @param t - * @return If the table paths are not same, return -1. - */ - @Override - public int compareTo(FileFragment t) { - if (getPath().equals(t.getPath())) { - long diff = this.getStartKey() - t.getStartKey(); - if (diff < 0) { - return -1; - } else if (diff > 0) { - return 1; - } else { - return 0; - } - } else { - return -1; - } - } - - @Override - public boolean equals(Object o) { - if (o instanceof FileFragment) { - FileFragment t = (FileFragment) o; - if (getPath().equals(t.getPath()) - && TUtil.checkEquals(t.getStartKey(), this.getStartKey()) - && TUtil.checkEquals(t.getLength(), this.getLength())) { - return true; - } - } - return false; - } - - @Override - public int hashCode() { - return Objects.hashCode(tableName, uri, startOffset, length); - } - - public Object clone() throws CloneNotSupportedException { - FileFragment frag = (FileFragment) super.clone(); - frag.tableName = tableName; - frag.uri = uri; - frag.diskIds = diskIds; - frag.hosts = hosts; - - return frag; - } - - @Override - public String toString() { - return "\"fragment\": {\"id\": \""+ tableName +"\", \"path\": " - +getPath() + "\", \"start\": " + this.getStartKey() + ",\"length\": " - + getLength() + "}" ; - } - - public FragmentProto getProto() { - FileFragmentProto.Builder builder = FileFragmentProto.newBuilder(); - builder.setId(this.tableName); - builder.setStartOffset(this.startOffset); - builder.setLength(this.length); - builder.setPath(this.uri.toString()); - if(diskIds != null) { - List<Integer> idList = new ArrayList<Integer>(); - for(int eachId: diskIds) { - idList.add(eachId); - } - builder.addAllDiskIds(idList); - } - - if(hosts != null) { - builder.addAllHosts(TUtil.newList(hosts)); - } - - FragmentProto.Builder fragmentBuilder = FragmentProto.newBuilder(); - fragmentBuilder.setId(this.tableName); - fragmentBuilder.setStoreType(StoreType.CSV.name()); - fragmentBuilder.setContents(builder.buildPartial().toByteString()); - return fragmentBuilder.build(); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/Fragment.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/Fragment.java b/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/Fragment.java deleted file mode 100644 index ac43197..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/Fragment.java +++ /dev/null @@ -1,39 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.fragment; - -import org.apache.tajo.common.ProtoObject; - -import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto; - -public interface Fragment extends ProtoObject<FragmentProto> { - - public abstract String getTableName(); - - @Override - public abstract FragmentProto getProto(); - - public abstract long getLength(); - - public abstract String getKey(); - - public String[] getHosts(); - - public abstract boolean isEmpty(); -} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FragmentConvertor.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FragmentConvertor.java b/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FragmentConvertor.java deleted file mode 100644 index 07720c7..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FragmentConvertor.java +++ /dev/null @@ -1,129 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.fragment; - -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.protobuf.ByteString; -import org.apache.hadoop.conf.Configuration; -import org.apache.tajo.annotation.ThreadSafe; - -import java.io.IOException; -import java.lang.reflect.Constructor; -import java.util.List; -import java.util.Map; - -import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto; - -@ThreadSafe -public class FragmentConvertor { - /** - * Cache of fragment classes - */ - protected static final Map<String, Class<? extends Fragment>> CACHED_FRAGMENT_CLASSES = Maps.newConcurrentMap(); - /** - * Cache of constructors for each class. - */ - private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE = Maps.newConcurrentMap(); - /** - * default parameter for all constructors - */ - private static final Class<?>[] DEFAULT_FRAGMENT_PARAMS = { ByteString.class }; - - public static Class<? extends Fragment> getFragmentClass(Configuration conf, String storeType) - throws IOException { - Class<? extends Fragment> fragmentClass = CACHED_FRAGMENT_CLASSES.get(storeType.toLowerCase()); - if (fragmentClass == null) { - fragmentClass = conf.getClass( - String.format("tajo.storage.fragment.%s.class", storeType.toLowerCase()), null, Fragment.class); - CACHED_FRAGMENT_CLASSES.put(storeType.toLowerCase(), fragmentClass); - } - - if (fragmentClass == null) { - throw new IOException("No such a fragment for " + storeType.toLowerCase()); - } - - return fragmentClass; - } - - public static <T extends Fragment> T convert(Class<T> clazz, FragmentProto fragment) { - T result; - try { - Constructor<T> constructor = (Constructor<T>) CONSTRUCTOR_CACHE.get(clazz); - if (constructor == null) { - constructor = clazz.getDeclaredConstructor(DEFAULT_FRAGMENT_PARAMS); - constructor.setAccessible(true); - CONSTRUCTOR_CACHE.put(clazz, constructor); - } - result = constructor.newInstance(new Object[]{fragment.getContents()}); - } catch (Exception e) { - throw new RuntimeException(e); - } - - return result; - } - - public static <T extends Fragment> T convert(Configuration conf, FragmentProto fragment) - throws IOException { - Class<T> fragmentClass = (Class<T>) getFragmentClass(conf, fragment.getStoreType().toLowerCase()); - if (fragmentClass == null) { - throw new IOException("No such a fragment class for " + fragment.getStoreType()); - } - return convert(fragmentClass, fragment); - } - - public static <T extends Fragment> List<T> convert(Class<T> clazz, FragmentProto...fragments) - throws IOException { - List<T> list = Lists.newArrayList(); - if (fragments == null) { - return list; - } - for (FragmentProto proto : fragments) { - list.add(convert(clazz, proto)); - } - return list; - } - - public static <T extends Fragment> List<T> convert(Configuration conf, FragmentProto...fragments) throws IOException { - List<T> list = Lists.newArrayList(); - if (fragments == null) { - return list; - } - for (FragmentProto proto : fragments) { - list.add((T) convert(conf, proto)); - } - return list; - } - - public static List<FragmentProto> toFragmentProtoList(Fragment... fragments) { - List<FragmentProto> list = Lists.newArrayList(); - if (fragments == null) { - return list; - } - for (Fragment fragment : fragments) { - list.add(fragment.getProto()); - } - return list; - } - - public static FragmentProto [] toFragmentProtoArray(Fragment... fragments) { - List<FragmentProto> list = toFragmentProtoList(fragments); - return list.toArray(new FragmentProto[list.size()]); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java b/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java deleted file mode 100644 index 8615235..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/AbstractHBaseAppender.java +++ /dev/null @@ -1,223 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.hbase; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.tajo.QueryUnitAttemptId; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.catalog.statistics.TableStats; -import org.apache.tajo.datum.Datum; -import org.apache.tajo.storage.Appender; -import org.apache.tajo.storage.TableStatistics; -import org.apache.tajo.storage.Tuple; -import org.apache.tajo.util.TUtil; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** - * An abstract class for HBase appender. - */ -public abstract class AbstractHBaseAppender implements Appender { - protected Configuration conf; - protected Schema schema; - protected TableMeta meta; - protected QueryUnitAttemptId taskAttemptId; - protected Path stagingDir; - protected boolean inited = false; - - protected ColumnMapping columnMapping; - protected TableStatistics stats; - protected boolean enabledStats; - - protected int columnNum; - - protected byte[][][] mappingColumnFamilies; - protected boolean[] isBinaryColumns; - protected boolean[] isRowKeyMappings; - protected boolean[] isColumnKeys; - protected boolean[] isColumnValues; - protected int[] rowKeyFieldIndexes; - protected int[] rowkeyColumnIndexes; - protected char rowKeyDelimiter; - - // the following four variables are used for '<cfname>:key:' or '<cfname>:value:' mapping - protected int[] columnKeyValueDataIndexes; - protected byte[][] columnKeyDatas; - protected byte[][] columnValueDatas; - protected byte[][] columnKeyCfNames; - - protected KeyValue[] keyValues; - - public AbstractHBaseAppender(Configuration conf, QueryUnitAttemptId taskAttemptId, - Schema schema, TableMeta meta, Path stagingDir) { - this.conf = conf; - this.schema = schema; - this.meta = meta; - this.stagingDir = stagingDir; - this.taskAttemptId = taskAttemptId; - } - - @Override - public void init() throws IOException { - if (inited) { - throw new IllegalStateException("FileAppender is already initialized."); - } - inited = true; - if (enabledStats) { - stats = new TableStatistics(this.schema); - } - columnMapping = new ColumnMapping(schema, meta); - - mappingColumnFamilies = columnMapping.getMappingColumns(); - - isRowKeyMappings = columnMapping.getIsRowKeyMappings(); - List<Integer> rowkeyColumnIndexList = new ArrayList<Integer>(); - for (int i = 0; i < isRowKeyMappings.length; i++) { - if (isRowKeyMappings[i]) { - rowkeyColumnIndexList.add(i); - } - } - rowkeyColumnIndexes = TUtil.toArray(rowkeyColumnIndexList); - - isBinaryColumns = columnMapping.getIsBinaryColumns(); - isColumnKeys = columnMapping.getIsColumnKeys(); - isColumnValues = columnMapping.getIsColumnValues(); - rowKeyDelimiter = columnMapping.getRowKeyDelimiter(); - rowKeyFieldIndexes = columnMapping.getRowKeyFieldIndexes(); - - this.columnNum = schema.size(); - - // In the case of '<cfname>:key:' or '<cfname>:value:' KeyValue object should be set with the qualifier and value - // which are mapped to the same column family. - columnKeyValueDataIndexes = new int[isColumnKeys.length]; - int index = 0; - int numKeyValues = 0; - Map<String, Integer> cfNameIndexMap = new HashMap<String, Integer>(); - for (int i = 0; i < isColumnKeys.length; i++) { - if (isRowKeyMappings[i]) { - continue; - } - if (isColumnKeys[i] || isColumnValues[i]) { - String cfName = new String(mappingColumnFamilies[i][0]); - if (!cfNameIndexMap.containsKey(cfName)) { - cfNameIndexMap.put(cfName, index); - columnKeyValueDataIndexes[i] = index; - index++; - numKeyValues++; - } else { - columnKeyValueDataIndexes[i] = cfNameIndexMap.get(cfName); - } - } else { - numKeyValues++; - } - } - columnKeyCfNames = new byte[cfNameIndexMap.size()][]; - for (Map.Entry<String, Integer> entry: cfNameIndexMap.entrySet()) { - columnKeyCfNames[entry.getValue()] = entry.getKey().getBytes(); - } - columnKeyDatas = new byte[cfNameIndexMap.size()][]; - columnValueDatas = new byte[cfNameIndexMap.size()][]; - - keyValues = new KeyValue[numKeyValues]; - } - - private ByteArrayOutputStream bout = new ByteArrayOutputStream(); - - protected byte[] getRowKeyBytes(Tuple tuple) throws IOException { - Datum datum; - byte[] rowkey; - if (rowkeyColumnIndexes.length > 1) { - bout.reset(); - for (int i = 0; i < rowkeyColumnIndexes.length; i++) { - datum = tuple.get(rowkeyColumnIndexes[i]); - if (isBinaryColumns[rowkeyColumnIndexes[i]]) { - rowkey = HBaseBinarySerializerDeserializer.serialize(schema.getColumn(rowkeyColumnIndexes[i]), datum); - } else { - rowkey = HBaseTextSerializerDeserializer.serialize(schema.getColumn(rowkeyColumnIndexes[i]), datum); - } - bout.write(rowkey); - if (i < rowkeyColumnIndexes.length - 1) { - bout.write(rowKeyDelimiter); - } - } - rowkey = bout.toByteArray(); - } else { - int index = rowkeyColumnIndexes[0]; - datum = tuple.get(index); - if (isBinaryColumns[index]) { - rowkey = HBaseBinarySerializerDeserializer.serialize(schema.getColumn(index), datum); - } else { - rowkey = HBaseTextSerializerDeserializer.serialize(schema.getColumn(index), datum); - } - } - - return rowkey; - } - - protected void readKeyValues(Tuple tuple, byte[] rowkey) throws IOException { - int keyValIndex = 0; - for (int i = 0; i < columnNum; i++) { - if (isRowKeyMappings[i]) { - continue; - } - Datum datum = tuple.get(i); - byte[] value; - if (isBinaryColumns[i]) { - value = HBaseBinarySerializerDeserializer.serialize(schema.getColumn(i), datum); - } else { - value = HBaseTextSerializerDeserializer.serialize(schema.getColumn(i), datum); - } - - if (isColumnKeys[i]) { - columnKeyDatas[columnKeyValueDataIndexes[i]] = value; - } else if (isColumnValues[i]) { - columnValueDatas[columnKeyValueDataIndexes[i]] = value; - } else { - keyValues[keyValIndex] = new KeyValue(rowkey, mappingColumnFamilies[i][0], mappingColumnFamilies[i][1], value); - keyValIndex++; - } - } - - for (int i = 0; i < columnKeyDatas.length; i++) { - keyValues[keyValIndex++] = new KeyValue(rowkey, columnKeyCfNames[i], columnKeyDatas[i], columnValueDatas[i]); - } - } - - @Override - public void enableStats() { - enabledStats = true; - } - - @Override - public TableStats getStats() { - if (enabledStats) { - return stats.getTableStat(); - } else { - return null; - } - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/AddSortForInsertRewriter.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/AddSortForInsertRewriter.java b/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/AddSortForInsertRewriter.java deleted file mode 100644 index 8044494..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/AddSortForInsertRewriter.java +++ /dev/null @@ -1,87 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.hbase; - -import org.apache.tajo.catalog.Column; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.SortSpec; -import org.apache.tajo.catalog.TableDesc; -import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; -import org.apache.tajo.plan.LogicalPlan; -import org.apache.tajo.plan.PlanningException; -import org.apache.tajo.plan.logical.*; -import org.apache.tajo.plan.logical.SortNode.SortPurpose; -import org.apache.tajo.plan.rewrite.RewriteRule; -import org.apache.tajo.plan.util.PlannerUtil; - -public class AddSortForInsertRewriter implements RewriteRule { - private int[] sortColumnIndexes; - private Column[] sortColumns; - public AddSortForInsertRewriter(TableDesc tableDesc, Column[] sortColumns) { - this.sortColumns = sortColumns; - this.sortColumnIndexes = new int[sortColumns.length]; - - Schema tableSchema = tableDesc.getSchema(); - for (int i = 0; i < sortColumns.length; i++) { - sortColumnIndexes[i] = tableSchema.getColumnId(sortColumns[i].getQualifiedName()); - } - } - - @Override - public String getName() { - return "AddSortForInsertRewriter"; - } - - @Override - public boolean isEligible(LogicalPlan plan) { - StoreType storeType = PlannerUtil.getStoreType(plan); - return storeType != null; - } - - @Override - public LogicalPlan rewrite(LogicalPlan plan) throws PlanningException { - LogicalRootNode rootNode = plan.getRootBlock().getRoot(); - UnaryNode insertNode = rootNode.getChild(); - LogicalNode childNode = insertNode.getChild(); - - Schema sortSchema = childNode.getOutSchema(); - SortNode sortNode = plan.createNode(SortNode.class); - sortNode.setSortPurpose(SortPurpose.STORAGE_SPECIFIED); - sortNode.setInSchema(sortSchema); - sortNode.setOutSchema(sortSchema); - - SortSpec[] sortSpecs = new SortSpec[sortColumns.length]; - int index = 0; - - for (int i = 0; i < sortColumnIndexes.length; i++) { - Column sortColumn = sortSchema.getColumn(sortColumnIndexes[i]); - if (sortColumn == null) { - throw new PlanningException("Can't fine proper sort column:" + sortColumns[i]); - } - sortSpecs[index++] = new SortSpec(sortColumn, true, true); - } - sortNode.setSortSpecs(sortSpecs); - - sortNode.setChild(insertNode.getChild()); - insertNode.setChild(sortNode); - plan.getRootBlock().registerNode(sortNode); - - return plan; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/ColumnMapping.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/ColumnMapping.java b/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/ColumnMapping.java deleted file mode 100644 index f80bd5e..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/ColumnMapping.java +++ /dev/null @@ -1,236 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.hbase; - -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.util.BytesUtils; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -public class ColumnMapping { - private TableMeta tableMeta; - private Schema schema; - private char rowKeyDelimiter; - - private String hbaseTableName; - - private int[] rowKeyFieldIndexes; - private boolean[] isRowKeyMappings; - private boolean[] isBinaryColumns; - private boolean[] isColumnKeys; - private boolean[] isColumnValues; - - // schema order -> 0: cf name, 1: column name -> name bytes - private byte[][][] mappingColumns; - - private int numRowKeys; - - public ColumnMapping(Schema schema, TableMeta tableMeta) throws IOException { - this.schema = schema; - this.tableMeta = tableMeta; - - init(); - } - - private void init() throws IOException { - hbaseTableName = tableMeta.getOption(HBaseStorageConstants.META_TABLE_KEY); - String delim = tableMeta.getOption(HBaseStorageConstants.META_ROWKEY_DELIMITER, "").trim(); - if (delim.length() > 0) { - rowKeyDelimiter = delim.charAt(0); - } - isRowKeyMappings = new boolean[schema.size()]; - rowKeyFieldIndexes = new int[schema.size()]; - isBinaryColumns = new boolean[schema.size()]; - isColumnKeys = new boolean[schema.size()]; - isColumnValues = new boolean[schema.size()]; - - mappingColumns = new byte[schema.size()][][]; - - for (int i = 0; i < schema.size(); i++) { - rowKeyFieldIndexes[i] = -1; - } - - String columnMapping = tableMeta.getOption(HBaseStorageConstants.META_COLUMNS_KEY, ""); - if (columnMapping == null || columnMapping.isEmpty()) { - throw new IOException("'columns' property is required."); - } - - String[] columnMappingTokens = columnMapping.split(","); - - if (columnMappingTokens.length != schema.getColumns().size()) { - throw new IOException("The number of mapped HBase columns is great than the number of Tajo table columns"); - } - - int index = 0; - for (String eachToken: columnMappingTokens) { - mappingColumns[index] = new byte[2][]; - - byte[][] mappingTokens = BytesUtils.splitPreserveAllTokens(eachToken.trim().getBytes(), ':'); - - if (mappingTokens.length == 3) { - if (mappingTokens[0].length == 0) { - // cfname - throw new IOException(eachToken + " 'column' attribute should be '<cfname>:key:' or '<cfname>:key:#b' " + - "or '<cfname>:value:' or '<cfname>:value:#b'"); - } - //<cfname>:key: or <cfname>:value: - if (mappingTokens[2].length != 0) { - String binaryOption = new String(mappingTokens[2]); - if ("#b".equals(binaryOption)) { - isBinaryColumns[index] = true; - } else { - throw new IOException(eachToken + " 'column' attribute should be '<cfname>:key:' or '<cfname>:key:#b' " + - "or '<cfname>:value:' or '<cfname>:value:#b'"); - } - } - mappingColumns[index][0] = mappingTokens[0]; - String keyOrValue = new String(mappingTokens[1]); - if (HBaseStorageConstants.KEY_COLUMN_MAPPING.equalsIgnoreCase(keyOrValue)) { - isColumnKeys[index] = true; - } else if (HBaseStorageConstants.VALUE_COLUMN_MAPPING.equalsIgnoreCase(keyOrValue)) { - isColumnValues[index] = true; - } else { - throw new IOException(eachToken + " 'column' attribute should be '<cfname>:key:' or '<cfname>:value:'"); - } - } else if (mappingTokens.length == 2) { - //<cfname>: or <cfname>:<qualifier> or :key - String cfName = new String(mappingTokens[0]); - String columnName = new String(mappingTokens[1]); - RowKeyMapping rowKeyMapping = getRowKeyMapping(cfName, columnName); - if (rowKeyMapping != null) { - isRowKeyMappings[index] = true; - numRowKeys++; - isBinaryColumns[index] = rowKeyMapping.isBinary(); - if (!cfName.isEmpty()) { - if (rowKeyDelimiter == 0) { - throw new IOException("hbase.rowkey.delimiter is required."); - } - rowKeyFieldIndexes[index] = Integer.parseInt(cfName); - } else { - rowKeyFieldIndexes[index] = -1; //rowkey is mapped a single column. - } - } else { - if (cfName.isEmpty()) { - throw new IOException(eachToken + " 'column' attribute should be '<cfname>:key:' or '<cfname>:value:'"); - } - if (cfName != null) { - mappingColumns[index][0] = Bytes.toBytes(cfName); - } - - if (columnName != null && !columnName.isEmpty()) { - String[] columnNameTokens = columnName.split("#"); - if (columnNameTokens[0].isEmpty()) { - mappingColumns[index][1] = null; - } else { - mappingColumns[index][1] = Bytes.toBytes(columnNameTokens[0]); - } - if (columnNameTokens.length == 2 && "b".equals(columnNameTokens[1])) { - isBinaryColumns[index] = true; - } - } - } - } else { - throw new IOException(eachToken + " 'column' attribute '[cfname]:[qualfier]:'"); - } - - index++; - } // for loop - } - - public List<String> getColumnFamilyNames() { - List<String> cfNames = new ArrayList<String>(); - - for (byte[][] eachCfName: mappingColumns) { - if (eachCfName != null && eachCfName.length > 0 && eachCfName[0] != null) { - String cfName = new String(eachCfName[0]); - if (!cfNames.contains(cfName)) { - cfNames.add(cfName); - } - } - } - - return cfNames; - } - - private RowKeyMapping getRowKeyMapping(String cfName, String columnName) { - if (columnName == null || columnName.isEmpty()) { - return null; - } - - String[] tokens = columnName.split("#"); - if (!tokens[0].equalsIgnoreCase(HBaseStorageConstants.KEY_COLUMN_MAPPING)) { - return null; - } - - RowKeyMapping rowKeyMapping = new RowKeyMapping(); - - if (tokens.length == 2 && "b".equals(tokens[1])) { - rowKeyMapping.setBinary(true); - } - - if (cfName != null && !cfName.isEmpty()) { - rowKeyMapping.setKeyFieldIndex(Integer.parseInt(cfName)); - } - return rowKeyMapping; - } - - public char getRowKeyDelimiter() { - return rowKeyDelimiter; - } - - public int[] getRowKeyFieldIndexes() { - return rowKeyFieldIndexes; - } - - public boolean[] getIsRowKeyMappings() { - return isRowKeyMappings; - } - - public byte[][][] getMappingColumns() { - return mappingColumns; - } - - public Schema getSchema() { - return schema; - } - - public boolean[] getIsBinaryColumns() { - return isBinaryColumns; - } - - public String getHbaseTableName() { - return hbaseTableName; - } - - public boolean[] getIsColumnKeys() { - return isColumnKeys; - } - - public int getNumRowKeys() { - return numRowKeys; - } - - public boolean[] getIsColumnValues() { - return isColumnValues; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBaseBinarySerializerDeserializer.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBaseBinarySerializerDeserializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBaseBinarySerializerDeserializer.java deleted file mode 100644 index c05c5bb..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBaseBinarySerializerDeserializer.java +++ /dev/null @@ -1,97 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.hbase; - -import org.apache.tajo.catalog.Column; -import org.apache.tajo.datum.Datum; -import org.apache.tajo.datum.DatumFactory; -import org.apache.tajo.datum.NullDatum; -import org.apache.tajo.util.Bytes; - -import java.io.IOException; - -public class HBaseBinarySerializerDeserializer { - - public static Datum deserialize(Column col, byte[] bytes) throws IOException { - Datum datum; - switch (col.getDataType().getType()) { - case INT1: - case INT2: - datum = bytes == null || bytes.length == 0 ? NullDatum.get() : DatumFactory.createInt2(Bytes.toShort(bytes)); - break; - case INT4: - datum = bytes == null || bytes.length == 0 ? NullDatum.get() : DatumFactory.createInt4(Bytes.toInt(bytes)); - break; - case INT8: - if (bytes.length == 4) { - datum = bytes == null || bytes.length == 0 ? NullDatum.get() : DatumFactory.createInt8(Bytes.toInt(bytes)); - } else { - datum = bytes == null || bytes.length == 0 ? NullDatum.get() : DatumFactory.createInt8(Bytes.toLong(bytes)); - } - break; - case FLOAT4: - datum = bytes == null || bytes.length == 0 ? NullDatum.get() : DatumFactory.createFloat4(Bytes.toFloat(bytes)); - break; - case FLOAT8: - datum = bytes == null || bytes.length == 0 ? NullDatum.get() : DatumFactory.createFloat8(Bytes.toDouble(bytes)); - break; - case TEXT: - datum = bytes == null ? NullDatum.get() : DatumFactory.createText(bytes); - break; - default: - datum = NullDatum.get(); - break; - } - return datum; - } - - public static byte[] serialize(Column col, Datum datum) throws IOException { - if (datum == null || datum instanceof NullDatum) { - return null; - } - - byte[] bytes; - switch (col.getDataType().getType()) { - case INT1: - case INT2: - bytes = Bytes.toBytes(datum.asInt2()); - break; - case INT4: - bytes = Bytes.toBytes(datum.asInt4()); - break; - case INT8: - bytes = Bytes.toBytes(datum.asInt8()); - break; - case FLOAT4: - bytes = Bytes.toBytes(datum.asFloat4()); - break; - case FLOAT8: - bytes = Bytes.toBytes(datum.asFloat8()); - break; - case TEXT: - bytes = Bytes.toBytes(datum.asChars()); - break; - default: - bytes = null; - break; - } - - return bytes; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBaseFragment.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBaseFragment.java b/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBaseFragment.java deleted file mode 100644 index 43ad7f3..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBaseFragment.java +++ /dev/null @@ -1,198 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.hbase; - -import com.google.common.base.Objects; -import com.google.gson.annotations.Expose; -import com.google.protobuf.ByteString; -import com.google.protobuf.InvalidProtocolBufferException; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto; -import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; -import org.apache.tajo.storage.fragment.Fragment; -import org.apache.tajo.storage.fragment.StorageFragmentProtos.HBaseFragmentProto; - -public class HBaseFragment implements Fragment, Comparable<HBaseFragment>, Cloneable { - @Expose - private String tableName; - @Expose - private String hbaseTableName; - @Expose - private byte[] startRow; - @Expose - private byte[] stopRow; - @Expose - private String regionLocation; - @Expose - private boolean last; - @Expose - private long length; - - public HBaseFragment(String tableName, String hbaseTableName, byte[] startRow, byte[] stopRow, String regionLocation) { - this.tableName = tableName; - this.hbaseTableName = hbaseTableName; - this.startRow = startRow; - this.stopRow = stopRow; - this.regionLocation = regionLocation; - this.last = false; - } - - public HBaseFragment(ByteString raw) throws InvalidProtocolBufferException { - HBaseFragmentProto.Builder builder = HBaseFragmentProto.newBuilder(); - builder.mergeFrom(raw); - builder.build(); - init(builder.build()); - } - - private void init(HBaseFragmentProto proto) { - this.tableName = proto.getTableName(); - this.hbaseTableName = proto.getHbaseTableName(); - this.startRow = proto.getStartRow().toByteArray(); - this.stopRow = proto.getStopRow().toByteArray(); - this.regionLocation = proto.getRegionLocation(); - this.length = proto.getLength(); - this.last = proto.getLast(); - } - - @Override - public int compareTo(HBaseFragment t) { - return Bytes.compareTo(startRow, t.startRow); - } - - @Override - public String getTableName() { - return tableName; - } - - @Override - public String getKey() { - return new String(startRow); - } - - @Override - public boolean isEmpty() { - return startRow == null || stopRow == null; - } - - @Override - public long getLength() { - return length; - } - - public void setLength(long length) { - this.length = length; - } - - @Override - public String[] getHosts() { - return new String[] {regionLocation}; - } - - public Object clone() throws CloneNotSupportedException { - HBaseFragment frag = (HBaseFragment) super.clone(); - frag.tableName = tableName; - frag.hbaseTableName = hbaseTableName; - frag.startRow = startRow; - frag.stopRow = stopRow; - frag.regionLocation = regionLocation; - frag.last = last; - frag.length = length; - return frag; - } - - @Override - public boolean equals(Object o) { - if (o instanceof HBaseFragment) { - HBaseFragment t = (HBaseFragment) o; - if (tableName.equals(t.tableName) - && Bytes.equals(startRow, t.startRow) - && Bytes.equals(stopRow, t.stopRow)) { - return true; - } - } - return false; - } - - @Override - public int hashCode() { - return Objects.hashCode(tableName, hbaseTableName, startRow, stopRow); - } - - @Override - public String toString() { - return "\"fragment\": {\"tableName\": \""+ tableName + "\", hbaseTableName\": \"" + hbaseTableName + "\"" + - ", \"startRow\": \"" + new String(startRow) + "\"" + - ", \"stopRow\": \"" + new String(stopRow) + "\"" + - ", \"length\": \"" + length + "\"}" ; - } - - @Override - public FragmentProto getProto() { - HBaseFragmentProto.Builder builder = HBaseFragmentProto.newBuilder(); - builder.setTableName(tableName) - .setHbaseTableName(hbaseTableName) - .setStartRow(ByteString.copyFrom(startRow)) - .setStopRow(ByteString.copyFrom(stopRow)) - .setLast(last) - .setLength(length) - .setRegionLocation(regionLocation); - - FragmentProto.Builder fragmentBuilder = FragmentProto.newBuilder(); - fragmentBuilder.setId(this.tableName); - fragmentBuilder.setContents(builder.buildPartial().toByteString()); - fragmentBuilder.setStoreType(StoreType.HBASE.name()); - return fragmentBuilder.build(); - } - - public byte[] getStartRow() { - return startRow; - } - - public byte[] getStopRow() { - return stopRow; - } - - public String getRegionLocation() { - return regionLocation; - } - - public boolean isLast() { - return last; - } - - public void setLast(boolean last) { - this.last = last; - } - - public String getHbaseTableName() { - return hbaseTableName; - } - - public void setHbaseTableName(String hbaseTableName) { - this.hbaseTableName = hbaseTableName; - } - - public void setStartRow(byte[] startRow) { - this.startRow = startRow; - } - - public void setStopRow(byte[] stopRow) { - this.stopRow = stopRow; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java b/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java deleted file mode 100644 index 50f61a8..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/hbase/HBasePutAppender.java +++ /dev/null @@ -1,120 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.hbase; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.client.HConnection; -import org.apache.hadoop.hbase.client.HTableInterface; -import org.apache.hadoop.hbase.client.Put; -import org.apache.tajo.QueryUnitAttemptId; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; -import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.datum.Datum; -import org.apache.tajo.storage.StorageManager; -import org.apache.tajo.storage.Tuple; - -import java.io.IOException; - -public class HBasePutAppender extends AbstractHBaseAppender { - private HTableInterface htable; - private long totalNumBytes; - - public HBasePutAppender(Configuration conf, QueryUnitAttemptId taskAttemptId, - Schema schema, TableMeta meta, Path stagingDir) { - super(conf, taskAttemptId, schema, meta, stagingDir); - } - - @Override - public void init() throws IOException { - super.init(); - - Configuration hbaseConf = HBaseStorageManager.getHBaseConfiguration(conf, meta); - HConnection hconn = ((HBaseStorageManager) StorageManager.getStorageManager((TajoConf)conf, StoreType.HBASE)) - .getConnection(hbaseConf); - htable = hconn.getTable(columnMapping.getHbaseTableName()); - htable.setAutoFlushTo(false); - htable.setWriteBufferSize(5 * 1024 * 1024); - } - - @Override - public void addTuple(Tuple tuple) throws IOException { - byte[] rowkey = getRowKeyBytes(tuple); - totalNumBytes += rowkey.length; - Put put = new Put(rowkey); - readKeyValues(tuple, rowkey); - - for (int i = 0; i < columnNum; i++) { - if (isRowKeyMappings[i]) { - continue; - } - Datum datum = tuple.get(i); - byte[] value; - if (isBinaryColumns[i]) { - value = HBaseBinarySerializerDeserializer.serialize(schema.getColumn(i), datum); - } else { - value = HBaseTextSerializerDeserializer.serialize(schema.getColumn(i), datum); - } - - if (isColumnKeys[i]) { - columnKeyDatas[columnKeyValueDataIndexes[i]] = value; - } else if (isColumnValues[i]) { - columnValueDatas[columnKeyValueDataIndexes[i]] = value; - } else { - put.add(mappingColumnFamilies[i][0], mappingColumnFamilies[i][1], value); - totalNumBytes += value.length; - } - } - - for (int i = 0; i < columnKeyDatas.length; i++) { - put.add(columnKeyCfNames[i], columnKeyDatas[i], columnValueDatas[i]); - totalNumBytes += columnKeyDatas[i].length + columnValueDatas[i].length; - } - - htable.put(put); - - if (enabledStats) { - stats.incrementRow(); - stats.setNumBytes(totalNumBytes); - } - } - - @Override - public void flush() throws IOException { - htable.flushCommits(); - } - - @Override - public long getEstimatedOutputSize() throws IOException { - return 0; - } - - @Override - public void close() throws IOException { - if (htable != null) { - htable.flushCommits(); - htable.close(); - } - if (enabledStats) { - stats.setNumBytes(totalNumBytes); - } - } -}
