http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/avro/package-info.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/avro/package-info.java b/tajo-storage/src/main/java/org/apache/tajo/storage/avro/package-info.java deleted file mode 100644 index 40d1545..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/avro/package-info.java +++ /dev/null @@ -1,85 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -/** - * <p> - * Provides read and write support for Avro files. Avro schemas are - * converted to Tajo schemas according to the following mapping of Avro - * and Tajo types: - * </p> - * - * <table> - * <tr> - * <th>Avro type</th> - * <th>Tajo type</th> - * </tr> - * <tr> - * <td>NULL</td> - * <td>NULL_TYPE</td> - * </tr> - * <tr> - * <td>BOOLEAN</td> - * <td>BOOLEAN</td> - * </tr> - * <tr> - * <td>INT</td> - * <td>INT4</td> - * </tr> - * <tr> - * <td>LONG</td> - * <td>INT8</td> - * </tr> - * <tr> - * <td>FLOAT</td> - * <td>FLOAT4</td> - * </tr> - * <tr> - * <td>DOUBLE</td> - * <td>FLOAT8</td> - * </tr> - * <tr> - * <td>BYTES</td> - * <td>BLOB</td> - * </tr> - * <tr> - * <td>STRING</td> - * <td>TEXT</td> - * </tr> - * <tr> - * <td>FIXED</td> - * <td>BLOB</td> - * </tr> - * <tr> - * <td>RECORD</td> - * <td>Not currently supported</td> - * </tr> - * <tr> - * <td>ENUM</td> - * <td>Not currently supported.</td> - * </tr> - * <tr> - * <td>MAP</td> - * <td>Not currently supported.</td> - * </tr> - * <tr> - * <td>UNION</td> - * <td>Not currently supported.</td> - * </tr> - * </table> - */ - -package org.apache.tajo.storage.avro;
http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/compress/CodecPool.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/compress/CodecPool.java b/tajo-storage/src/main/java/org/apache/tajo/storage/compress/CodecPool.java deleted file mode 100644 index baeda8c..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/compress/CodecPool.java +++ /dev/null @@ -1,185 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.compress; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.compress.CompressionCodec; -import org.apache.hadoop.io.compress.Compressor; -import org.apache.hadoop.io.compress.Decompressor; -import org.apache.hadoop.io.compress.DoNotPool; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** - * A global compressor/decompressor pool used to save and reuse (possibly - * native) compression/decompression codecs. - */ -public final class CodecPool { - private static final Log LOG = LogFactory.getLog(CodecPool.class); - - /** - * A global compressor pool used to save the expensive - * construction/destruction of (possibly native) decompression codecs. - */ - private static final Map<Class<Compressor>, List<Compressor>> COMPRESSOR_POOL = - new HashMap<Class<Compressor>, List<Compressor>>(); - - /** - * A global decompressor pool used to save the expensive - * construction/destruction of (possibly native) decompression codecs. - */ - private static final Map<Class<Decompressor>, List<Decompressor>> DECOMPRESSOR_POOL = - new HashMap<Class<Decompressor>, List<Decompressor>>(); - - private static <T> T borrow(Map<Class<T>, List<T>> pool, - Class<? extends T> codecClass) { - T codec = null; - - // Check if an appropriate codec is available - synchronized (pool) { - if (pool.containsKey(codecClass)) { - List<T> codecList = pool.get(codecClass); - - if (codecList != null) { - synchronized (codecList) { - if (!codecList.isEmpty()) { - codec = codecList.remove(codecList.size() - 1); - } - } - } - } - } - - return codec; - } - - private static <T> void payback(Map<Class<T>, List<T>> pool, T codec) { - if (codec != null) { - Class<T> codecClass = (Class<T>) codec.getClass(); - synchronized (pool) { - if (!pool.containsKey(codecClass)) { - pool.put(codecClass, new ArrayList<T>()); - } - - List<T> codecList = pool.get(codecClass); - synchronized (codecList) { - codecList.add(codec); - } - } - } - } - - /** - * Get a {@link Compressor} for the given {@link CompressionCodec} from the - * pool or a new one. - * - * @param codec - * the <code>CompressionCodec</code> for which to get the - * <code>Compressor</code> - * @param conf the <code>Configuration</code> object which contains confs for creating or reinit the compressor - * @return <code>Compressor</code> for the given <code>CompressionCodec</code> - * from the pool or a new one - */ - public static Compressor getCompressor(CompressionCodec codec, Configuration conf) { - Compressor compressor = borrow(COMPRESSOR_POOL, codec.getCompressorType()); - if (compressor == null) { - compressor = codec.createCompressor(); - LOG.info("Got brand-new compressor ["+codec.getDefaultExtension()+"]"); - } else { - compressor.reinit(conf); - if(LOG.isDebugEnabled()) { - LOG.debug("Got recycled compressor"); - } - } - return compressor; - } - - public static Compressor getCompressor(CompressionCodec codec) { - return getCompressor(codec, null); - } - - /** - * Get a {@link Decompressor} for the given {@link CompressionCodec} from the - * pool or a new one. - * - * @param codec - * the <code>CompressionCodec</code> for which to get the - * <code>Decompressor</code> - * @return <code>Decompressor</code> for the given - * <code>CompressionCodec</code> the pool or a new one - */ - public static Decompressor getDecompressor(CompressionCodec codec) { - Decompressor decompressor = borrow(DECOMPRESSOR_POOL, codec - .getDecompressorType()); - if (decompressor == null) { - decompressor = codec.createDecompressor(); - LOG.info("Got brand-new decompressor ["+codec.getDefaultExtension()+"]"); - } else { - if(LOG.isDebugEnabled()) { - LOG.debug("Got recycled decompressor"); - } - } - return decompressor; - } - - /** - * Return the {@link Compressor} to the pool. - * - * @param compressor - * the <code>Compressor</code> to be returned to the pool - */ - public static void returnCompressor(Compressor compressor) { - if (compressor == null) { - return; - } - // if the compressor can't be reused, don't pool it. - if (compressor.getClass().isAnnotationPresent(DoNotPool.class)) { - return; - } - compressor.reset(); - payback(COMPRESSOR_POOL, compressor); - } - - /** - * Return the {@link Decompressor} to the pool. - * - * @param decompressor - * the <code>Decompressor</code> to be returned to the pool - */ - public static void returnDecompressor(Decompressor decompressor) { - if (decompressor == null) { - return; - } - // if the decompressor can't be reused, don't pool it. - if (decompressor.getClass().isAnnotationPresent(DoNotPool.class)) { - return; - } - decompressor.reset(); - payback(DECOMPRESSOR_POOL, decompressor); - } - - private CodecPool() { - // prevent instantiation - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/exception/AlreadyExistsStorageException.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/exception/AlreadyExistsStorageException.java b/tajo-storage/src/main/java/org/apache/tajo/storage/exception/AlreadyExistsStorageException.java deleted file mode 100644 index bb035a8..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/exception/AlreadyExistsStorageException.java +++ /dev/null @@ -1,39 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * - */ -package org.apache.tajo.storage.exception; - -import org.apache.hadoop.fs.Path; - -import java.io.IOException; - -public class AlreadyExistsStorageException extends IOException { - private static final long serialVersionUID = 965518916144019032L; - - - public AlreadyExistsStorageException(String path) { - super("Error: "+path+" alreay exists"); - } - - public AlreadyExistsStorageException(Path path) { - this(path.toString()); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/exception/UnknownCodecException.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/exception/UnknownCodecException.java b/tajo-storage/src/main/java/org/apache/tajo/storage/exception/UnknownCodecException.java deleted file mode 100644 index a67d1f7..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/exception/UnknownCodecException.java +++ /dev/null @@ -1,32 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.exception; - -public class UnknownCodecException extends Exception { - - private static final long serialVersionUID = 4287230843540404529L; - - public UnknownCodecException() { - - } - - public UnknownCodecException(String message) { - super(message); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/exception/UnknownDataTypeException.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/exception/UnknownDataTypeException.java b/tajo-storage/src/main/java/org/apache/tajo/storage/exception/UnknownDataTypeException.java deleted file mode 100644 index d18b5a0..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/exception/UnknownDataTypeException.java +++ /dev/null @@ -1,32 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.exception; - -public class UnknownDataTypeException extends Exception { - - private static final long serialVersionUID = -2630390595968966164L; - - public UnknownDataTypeException() { - - } - - public UnknownDataTypeException(String message) { - super(message); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/exception/UnsupportedFileTypeException.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/exception/UnsupportedFileTypeException.java b/tajo-storage/src/main/java/org/apache/tajo/storage/exception/UnsupportedFileTypeException.java deleted file mode 100644 index 8b197d6..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/exception/UnsupportedFileTypeException.java +++ /dev/null @@ -1,36 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * - */ -package org.apache.tajo.storage.exception; - -public class UnsupportedFileTypeException extends RuntimeException { - private static final long serialVersionUID = -8160289695849000342L; - - public UnsupportedFileTypeException() { - } - - /** - * @param message - */ - public UnsupportedFileTypeException(String message) { - super(message); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java b/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java deleted file mode 100644 index 6fe6841..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FileFragment.java +++ /dev/null @@ -1,224 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.fragment; - -import com.google.common.base.Objects; -import com.google.gson.annotations.Expose; -import com.google.protobuf.ByteString; -import com.google.protobuf.InvalidProtocolBufferException; -import org.apache.hadoop.fs.BlockLocation; -import org.apache.hadoop.fs.Path; -import org.apache.tajo.util.TUtil; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; - -import static org.apache.tajo.catalog.proto.CatalogProtos.FileFragmentProto; -import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto; - -public class FileFragment implements Fragment, Comparable<FileFragment>, Cloneable { - @Expose private String tableName; // required - @Expose private Path uri; // required - @Expose private Long startOffset; // required - @Expose private Long length; // required - - private String[] hosts; // Datanode hostnames - @Expose private int[] diskIds; - - public FileFragment(ByteString raw) throws InvalidProtocolBufferException { - FileFragmentProto.Builder builder = FileFragmentProto.newBuilder(); - builder.mergeFrom(raw); - builder.build(); - init(builder.build()); - } - - public FileFragment(String tableName, Path uri, BlockLocation blockLocation) - throws IOException { - this.set(tableName, uri, blockLocation.getOffset(), blockLocation.getLength(), blockLocation.getHosts(), null); - } - - public FileFragment(String tableName, Path uri, long start, long length, String[] hosts, int[] diskIds) { - this.set(tableName, uri, start, length, hosts, diskIds); - } - // Non splittable - public FileFragment(String tableName, Path uri, long start, long length, String[] hosts) { - this.set(tableName, uri, start, length, hosts, null); - } - - public FileFragment(String fragmentId, Path path, long start, long length) { - this.set(fragmentId, path, start, length, null, null); - } - - public FileFragment(FileFragmentProto proto) { - init(proto); - } - - private void init(FileFragmentProto proto) { - int[] diskIds = new int[proto.getDiskIdsList().size()]; - int i = 0; - for(Integer eachValue: proto.getDiskIdsList()) { - diskIds[i++] = eachValue; - } - this.set(proto.getId(), new Path(proto.getPath()), - proto.getStartOffset(), proto.getLength(), - proto.getHostsList().toArray(new String[]{}), - diskIds); - } - - private void set(String tableName, Path path, long start, - long length, String[] hosts, int[] diskIds) { - this.tableName = tableName; - this.uri = path; - this.startOffset = start; - this.length = length; - this.hosts = hosts; - this.diskIds = diskIds; - } - - - /** - * Get the list of hosts (hostname) hosting this block - */ - public String[] getHosts() { - if (hosts == null) { - this.hosts = new String[0]; - } - return hosts; - } - - /** - * Get the list of Disk Ids - * Unknown disk is -1. Others 0 ~ N - */ - public int[] getDiskIds() { - if (diskIds == null) { - this.diskIds = new int[getHosts().length]; - Arrays.fill(this.diskIds, -1); - } - return diskIds; - } - - public void setDiskIds(int[] diskIds){ - this.diskIds = diskIds; - } - - public String getTableName() { - return this.tableName; - } - - public Path getPath() { - return this.uri; - } - - public void setPath(Path path) { - this.uri = path; - } - - public Long getStartKey() { - return this.startOffset; - } - - public Long getEndKey() { - return this.length; - } - - /** - * - * The offset range of tablets <b>MUST NOT</b> be overlapped. - * - * @param t - * @return If the table paths are not same, return -1. - */ - @Override - public int compareTo(FileFragment t) { - if (getPath().equals(t.getPath())) { - long diff = this.getStartKey() - t.getStartKey(); - if (diff < 0) { - return -1; - } else if (diff > 0) { - return 1; - } else { - return 0; - } - } else { - return -1; - } - } - - @Override - public boolean equals(Object o) { - if (o instanceof FileFragment) { - FileFragment t = (FileFragment) o; - if (getPath().equals(t.getPath()) - && TUtil.checkEquals(t.getStartKey(), this.getStartKey()) - && TUtil.checkEquals(t.getEndKey(), this.getEndKey())) { - return true; - } - } - return false; - } - - @Override - public int hashCode() { - return Objects.hashCode(tableName, uri, startOffset, length); - } - - public Object clone() throws CloneNotSupportedException { - FileFragment frag = (FileFragment) super.clone(); - frag.tableName = tableName; - frag.uri = uri; - frag.diskIds = diskIds; - frag.hosts = hosts; - - return frag; - } - - @Override - public String toString() { - return "\"fragment\": {\"id\": \""+ tableName +"\", \"path\": " - +getPath() + "\", \"start\": " + this.getStartKey() + ",\"length\": " - + getEndKey() + "}" ; - } - - public FragmentProto getProto() { - FileFragmentProto.Builder builder = FileFragmentProto.newBuilder(); - builder.setId(this.tableName); - builder.setStartOffset(this.startOffset); - builder.setLength(this.length); - builder.setPath(this.uri.toString()); - if(diskIds != null) { - List<Integer> idList = new ArrayList<Integer>(); - for(int eachId: diskIds) { - idList.add(eachId); - } - builder.addAllDiskIds(idList); - } - - if(hosts != null) { - builder.addAllHosts(TUtil.newList(hosts)); - } - - FragmentProto.Builder fragmentBuilder = FragmentProto.newBuilder(); - fragmentBuilder.setId(this.tableName); - fragmentBuilder.setContents(builder.buildPartial().toByteString()); - return fragmentBuilder.build(); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/Fragment.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/Fragment.java b/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/Fragment.java deleted file mode 100644 index 3f9c160..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/Fragment.java +++ /dev/null @@ -1,31 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.fragment; - -import org.apache.tajo.common.ProtoObject; - -import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto; - -public interface Fragment extends ProtoObject<FragmentProto> { - - public abstract String getTableName(); - - @Override - public abstract FragmentProto getProto(); -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FragmentConvertor.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FragmentConvertor.java b/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FragmentConvertor.java deleted file mode 100644 index 0315a8d..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/fragment/FragmentConvertor.java +++ /dev/null @@ -1,132 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.fragment; - -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.protobuf.ByteString; -import org.apache.hadoop.conf.Configuration; -import org.apache.tajo.annotation.ThreadSafe; - -import java.io.IOException; -import java.lang.reflect.Constructor; -import java.util.List; -import java.util.Map; - -import static org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto; -import static org.apache.tajo.catalog.proto.CatalogProtos.StoreType; - -@ThreadSafe -public class FragmentConvertor { - /** - * Cache of fragment classes - */ - protected static final Map<String, Class<? extends Fragment>> CACHED_FRAGMENT_CLASSES = Maps.newConcurrentMap(); - /** - * Cache of constructors for each class. - */ - private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE = Maps.newConcurrentMap(); - /** - * default parameter for all constructors - */ - private static final Class<?>[] DEFAULT_FRAGMENT_PARAMS = { ByteString.class }; - - public static Class<? extends Fragment> getFragmentClass(Configuration conf, StoreType storeType) - throws IOException { - String handlerName = storeType.name().toLowerCase(); - Class<? extends Fragment> fragmentClass = CACHED_FRAGMENT_CLASSES.get(handlerName); - if (fragmentClass == null) { - fragmentClass = conf.getClass( - String.format("tajo.storage.fragment.%s.class", storeType.name().toLowerCase()), null, Fragment.class); - CACHED_FRAGMENT_CLASSES.put(handlerName, fragmentClass); - } - - if (fragmentClass == null) { - throw new IOException("No such a fragment for " + storeType.name()); - } - - return fragmentClass; - } - - public static <T extends Fragment> T convert(Class<T> clazz, FragmentProto fragment) { - T result; - try { - Constructor<T> constructor = (Constructor<T>) CONSTRUCTOR_CACHE.get(clazz); - if (constructor == null) { - constructor = clazz.getDeclaredConstructor(DEFAULT_FRAGMENT_PARAMS); - constructor.setAccessible(true); - CONSTRUCTOR_CACHE.put(clazz, constructor); - } - result = constructor.newInstance(new Object[]{fragment.getContents()}); - } catch (Exception e) { - throw new RuntimeException(e); - } - - return result; - } - - public static <T extends Fragment> T convert(Configuration conf, StoreType storeType, FragmentProto fragment) - throws IOException { - Class<T> fragmentClass = (Class<T>) getFragmentClass(conf, storeType); - if (fragmentClass == null) { - throw new IOException("No such a fragment class for " + storeType.name()); - } - return convert(fragmentClass, fragment); - } - - public static <T extends Fragment> List<T> convert(Class<T> clazz, FragmentProto...fragments) - throws IOException { - List<T> list = Lists.newArrayList(); - if (fragments == null) { - return list; - } - for (FragmentProto proto : fragments) { - list.add(convert(clazz, proto)); - } - return list; - } - - public static <T extends Fragment> List<T> convert(Configuration conf, StoreType storeType, - FragmentProto...fragments) throws IOException { - List<T> list = Lists.newArrayList(); - if (fragments == null) { - return list; - } - for (FragmentProto proto : fragments) { - list.add((T) convert(conf, storeType, proto)); - } - return list; - } - - public static List<FragmentProto> toFragmentProtoList(Fragment... fragments) { - List<FragmentProto> list = Lists.newArrayList(); - if (fragments == null) { - return list; - } - for (Fragment fragment : fragments) { - list.add(fragment.getProto()); - } - return list; - } - - public static FragmentProto [] toFragmentProtoArray(Fragment... fragments) { - List<FragmentProto> list = toFragmentProtoList(fragments); - return list.toArray(new FragmentProto[list.size()]); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexMethod.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexMethod.java b/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexMethod.java deleted file mode 100644 index ccba3be..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexMethod.java +++ /dev/null @@ -1,33 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.index; - -import org.apache.hadoop.fs.Path; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.storage.BaseTupleComparator; -import org.apache.tajo.storage.TupleComparator; - -import java.io.IOException; - -public interface IndexMethod { - IndexWriter getIndexWriter(final Path fileName, int level, Schema keySchema, - TupleComparator comparator) throws IOException; - IndexReader getIndexReader(final Path fileName, Schema keySchema, - TupleComparator comparator) throws IOException; -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexReader.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexReader.java b/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexReader.java deleted file mode 100644 index 7baf7aa..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexReader.java +++ /dev/null @@ -1,35 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.index; - -import org.apache.tajo.storage.Tuple; - -import java.io.IOException; - -public interface IndexReader { - - /** - * Find the offset corresponding to key which is equal to a given key. - * - * @param key - * @return - * @throws IOException - */ - public long find(Tuple key) throws IOException; -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexWriter.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexWriter.java b/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexWriter.java deleted file mode 100644 index 04738f8..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/index/IndexWriter.java +++ /dev/null @@ -1,33 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * - */ -package org.apache.tajo.storage.index; - -import org.apache.tajo.storage.Tuple; - -import java.io.IOException; - -public abstract class IndexWriter { - - public abstract void write(Tuple key, long offset) throws IOException; - - public abstract void close() throws IOException; -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/index/OrderIndexReader.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/index/OrderIndexReader.java b/tajo-storage/src/main/java/org/apache/tajo/storage/index/OrderIndexReader.java deleted file mode 100644 index 688bbc7..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/index/OrderIndexReader.java +++ /dev/null @@ -1,45 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * - */ -package org.apache.tajo.storage.index; - -import org.apache.tajo.storage.Tuple; - -import java.io.IOException; - -public interface OrderIndexReader extends IndexReader { - /** - * Find the offset corresponding to key which is equal to or greater than - * a given key. - * - * @param key to find - * @return - * @throws IOException - */ - public long find(Tuple key, boolean nextKey) throws IOException; - - /** - * Return the next offset from the latest find or next offset - * @return - * @throws IOException - */ - public long next() throws IOException; -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java b/tajo-storage/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java deleted file mode 100644 index f093f9d..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/index/bst/BSTIndex.java +++ /dev/null @@ -1,623 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.index.bst; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.proto.CatalogProtos.SchemaProto; -import org.apache.tajo.storage.*; -import org.apache.tajo.storage.RowStoreUtil.RowStoreDecoder; -import org.apache.tajo.storage.RowStoreUtil.RowStoreEncoder; -import org.apache.tajo.storage.index.IndexMethod; -import org.apache.tajo.storage.index.IndexWriter; -import org.apache.tajo.storage.index.OrderIndexReader; - -import java.io.Closeable; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.util.LinkedList; -import java.util.Set; -import java.util.TreeMap; - -import static org.apache.tajo.index.IndexProtos.TupleComparatorProto; - -/** - * This is two-level binary search tree index. This is one of the value-list - * index structure. Thus, it is inefficient in the case where - * the many of the values are same. Also, the BST shows the fast performance - * when the selectivity of rows to be retrieved is less than 5%. - * BSTIndexWriter is not thread-safe, whereas BSTIndexReader is thread-safe. - */ -public class BSTIndex implements IndexMethod { - private static final Log LOG = LogFactory.getLog(BSTIndex.class); - - public static final int ONE_LEVEL_INDEX = 1; - public static final int TWO_LEVEL_INDEX = 2; - - private final Configuration conf; - - public BSTIndex(final Configuration conf) { - this.conf = conf; - } - - @Override - public BSTIndexWriter getIndexWriter(final Path fileName, int level, Schema keySchema, - TupleComparator comparator) throws IOException { - return new BSTIndexWriter(fileName, level, keySchema, comparator); - } - - @Override - public BSTIndexReader getIndexReader(Path fileName, Schema keySchema, TupleComparator comparator) throws IOException { - return new BSTIndexReader(fileName, keySchema, comparator); - } - - public BSTIndexReader getIndexReader(Path fileName) throws IOException { - return new BSTIndexReader(fileName); - } - - public class BSTIndexWriter extends IndexWriter implements Closeable { - private FSDataOutputStream out; - private FileSystem fs; - private int level; - private int loadNum = 4096; - private Path fileName; - - private final Schema keySchema; - private final TupleComparator compartor; - private final KeyOffsetCollector collector; - private KeyOffsetCollector rootCollector; - - private Tuple firstKey; - private Tuple lastKey; - - private RowStoreEncoder rowStoreEncoder; - - // private Tuple lastestKey = null; - - /** - * constructor - * - * @param level - * : IndexCreater.ONE_LEVEL_INDEX or IndexCreater.TWO_LEVEL_INDEX - * @throws IOException - */ - public BSTIndexWriter(final Path fileName, int level, Schema keySchema, - TupleComparator comparator) throws IOException { - this.fileName = fileName; - this.level = level; - this.keySchema = keySchema; - this.compartor = comparator; - this.collector = new KeyOffsetCollector(comparator); - this.rowStoreEncoder = RowStoreUtil.createEncoder(keySchema); - } - - public void setLoadNum(int loadNum) { - this.loadNum = loadNum; - } - - public void open() throws IOException { - fs = fileName.getFileSystem(conf); - if (fs.exists(fileName)) { - throw new IOException("ERROR: index file (" + fileName + " already exists"); - } - out = fs.create(fileName); - } - - @Override - public void write(Tuple key, long offset) throws IOException { - if (firstKey == null || compartor.compare(key, firstKey) < 0) { - firstKey = key; - } - if (lastKey == null || compartor.compare(lastKey, key) < 0) { - lastKey = key; - } - - collector.put(key, offset); - } - - public TupleComparator getComparator() { - return this.compartor; - } - - public void flush() throws IOException { - out.flush(); - } - - public void writeHeader(int entryNum) throws IOException { - // schema - byte [] schemaBytes = keySchema.getProto().toByteArray(); - out.writeInt(schemaBytes.length); - out.write(schemaBytes); - - // comparator - byte [] comparatorBytes = compartor.getProto().toByteArray(); - out.writeInt(comparatorBytes.length); - out.write(comparatorBytes); - - // level - out.writeInt(this.level); - // entry - out.writeInt(entryNum); - if (entryNum > 0) { - byte [] minBytes = rowStoreEncoder.toBytes(firstKey); - out.writeInt(minBytes.length); - out.write(minBytes); - byte [] maxBytes = rowStoreEncoder.toBytes(lastKey); - out.writeInt(maxBytes.length); - out.write(maxBytes); - } - out.flush(); - } - - public void close() throws IOException { - /* two level initialize */ - if (this.level == TWO_LEVEL_INDEX) { - rootCollector = new KeyOffsetCollector(this.compartor); - } - - /* data writing phase */ - TreeMap<Tuple, LinkedList<Long>> keyOffsetMap = collector.getMap(); - Set<Tuple> keySet = keyOffsetMap.keySet(); - - int entryNum = keySet.size(); - writeHeader(entryNum); - - int loadCount = this.loadNum - 1; - for (Tuple key : keySet) { - - if (this.level == TWO_LEVEL_INDEX) { - loadCount++; - if (loadCount == this.loadNum) { - rootCollector.put(key, out.getPos()); - loadCount = 0; - } - } - /* key writing */ - byte[] buf = rowStoreEncoder.toBytes(key); - out.writeInt(buf.length); - out.write(buf); - - /**/ - LinkedList<Long> offsetList = keyOffsetMap.get(key); - /* offset num writing */ - int offsetSize = offsetList.size(); - out.writeInt(offsetSize); - /* offset writing */ - for (Long offset : offsetList) { - out.writeLong(offset); - } - } - - out.flush(); - out.close(); - keySet.clear(); - collector.clear(); - - FSDataOutputStream rootOut = null; - /* root index creating phase */ - if (this.level == TWO_LEVEL_INDEX) { - TreeMap<Tuple, LinkedList<Long>> rootMap = rootCollector.getMap(); - keySet = rootMap.keySet(); - - rootOut = fs.create(new Path(fileName + ".root")); - rootOut.writeInt(this.loadNum); - rootOut.writeInt(keySet.size()); - - /* root key writing */ - for (Tuple key : keySet) { - byte[] buf = rowStoreEncoder.toBytes(key); - rootOut.writeInt(buf.length); - rootOut.write(buf); - - LinkedList<Long> offsetList = rootMap.get(key); - if (offsetList.size() > 1 || offsetList.size() == 0) { - throw new IOException("Why root index doen't have one offset?"); - } - rootOut.writeLong(offsetList.getFirst()); - - } - rootOut.flush(); - rootOut.close(); - - keySet.clear(); - rootCollector.clear(); - } - } - - private class KeyOffsetCollector { - private TreeMap<Tuple, LinkedList<Long>> map; - - public KeyOffsetCollector(TupleComparator comparator) { - map = new TreeMap<Tuple, LinkedList<Long>>(comparator); - } - - public void put(Tuple key, long offset) { - if (map.containsKey(key)) { - map.get(key).add(offset); - } else { - LinkedList<Long> list = new LinkedList<Long>(); - list.add(offset); - map.put(key, list); - } - } - - public TreeMap<Tuple, LinkedList<Long>> getMap() { - return this.map; - } - - public void clear() { - this.map.clear(); - } - } - } - - /** - * BSTIndexReader is thread-safe. - */ - public class BSTIndexReader implements OrderIndexReader , Closeable{ - private Path fileName; - private Schema keySchema; - private TupleComparator comparator; - - private FileSystem fs; - private FSDataInputStream indexIn; - private FSDataInputStream subIn; - - private int level; - private int entryNum; - private int loadNum = -1; - private Tuple firstKey; - private Tuple lastKey; - - // the cursors of BST - private int rootCursor; - private int keyCursor; - private int offsetCursor; - - // mutex - private final Object mutex = new Object(); - - private RowStoreDecoder rowStoreDecoder; - - /** - * - * @param fileName - * @param keySchema - * @param comparator - * @throws IOException - */ - public BSTIndexReader(final Path fileName, Schema keySchema, TupleComparator comparator) throws IOException { - this.fileName = fileName; - this.keySchema = keySchema; - this.comparator = comparator; - this.rowStoreDecoder = RowStoreUtil.createDecoder(keySchema); - } - - public BSTIndexReader(final Path fileName) throws IOException { - this.fileName = fileName; - } - - public Schema getKeySchema() { - return this.keySchema; - } - - public TupleComparator getComparator() { - return this.comparator; - } - - private void readHeader() throws IOException { - // schema - int schemaByteSize = indexIn.readInt(); - byte [] schemaBytes = new byte[schemaByteSize]; - StorageUtil.readFully(indexIn, schemaBytes, 0, schemaByteSize); - - SchemaProto.Builder builder = SchemaProto.newBuilder(); - builder.mergeFrom(schemaBytes); - SchemaProto proto = builder.build(); - this.keySchema = new Schema(proto); - this.rowStoreDecoder = RowStoreUtil.createDecoder(keySchema); - - // comparator - int compByteSize = indexIn.readInt(); - byte [] compBytes = new byte[compByteSize]; - StorageUtil.readFully(indexIn, compBytes, 0, compByteSize); - - TupleComparatorProto.Builder compProto = TupleComparatorProto.newBuilder(); - compProto.mergeFrom(compBytes); - this.comparator = new BaseTupleComparator(compProto.build()); - - // level - this.level = indexIn.readInt(); - // entry - this.entryNum = indexIn.readInt(); - if (entryNum > 0) { // if there is no any entry, do not read firstKey/lastKey values - byte [] minBytes = new byte[indexIn.readInt()]; - StorageUtil.readFully(indexIn, minBytes, 0, minBytes.length); - this.firstKey = rowStoreDecoder.toTuple(minBytes); - - byte [] maxBytes = new byte[indexIn.readInt()]; - StorageUtil.readFully(indexIn, maxBytes, 0, maxBytes.length); - this.lastKey = rowStoreDecoder.toTuple(maxBytes); - } - } - - public void open() - throws IOException { - /* init the index file */ - fs = fileName.getFileSystem(conf); - if (!fs.exists(fileName)) { - throw new FileNotFoundException("ERROR: does not exist " + fileName.toString()); - } - - indexIn = fs.open(this.fileName); - readHeader(); - fillData(); - } - - private void fillData() throws IOException { - /* load on memory */ - if (this.level == TWO_LEVEL_INDEX) { - - Path rootPath = new Path(this.fileName + ".root"); - if (!fs.exists(rootPath)) { - throw new FileNotFoundException("root index did not created"); - } - - subIn = indexIn; - indexIn = fs.open(rootPath); - /* root index header reading : type => loadNum => indexSize */ - this.loadNum = indexIn.readInt(); - this.entryNum = indexIn.readInt(); - /**/ - fillRootIndex(entryNum, indexIn); - - } else { - fillLeafIndex(entryNum, indexIn, -1); - } - } - - /** - * - * @return - * @throws IOException - */ - public long find(Tuple key) throws IOException { - return find(key, false); - } - - @Override - public long find(Tuple key, boolean nextKey) throws IOException { - synchronized (mutex) { - int pos = -1; - if (this.level == ONE_LEVEL_INDEX) { - pos = oneLevBS(key); - } else if (this.level == TWO_LEVEL_INDEX) { - pos = twoLevBS(key, this.loadNum + 1); - } else { - throw new IOException("More than TWL_LEVEL_INDEX is not supported."); - } - - if (nextKey) { - if (pos + 1 >= this.offsetSubIndex.length) { - return -1; - } - keyCursor = pos + 1; - offsetCursor = 0; - } else { - if (correctable) { - keyCursor = pos; - offsetCursor = 0; - } else { - return -1; - } - } - - return this.offsetSubIndex[keyCursor][offsetCursor]; - } - } - - public long next() throws IOException { - synchronized (mutex) { - if (offsetSubIndex[keyCursor].length - 1 > offsetCursor) { - offsetCursor++; - } else { - if (offsetSubIndex.length - 1 > keyCursor) { - keyCursor++; - offsetCursor = 0; - } else { - if (offsetIndex.length -1 > rootCursor) { - rootCursor++; - fillLeafIndex(loadNum + 1, subIn, this.offsetIndex[rootCursor]); - keyCursor = 1; - offsetCursor = 0; - } else { - return -1; - } - } - } - - return this.offsetSubIndex[keyCursor][offsetCursor]; - } - } - - public boolean isCurInMemory() { - return (offsetSubIndex[keyCursor].length - 1 >= offsetCursor); - } - - private void fillLeafIndex(int entryNum, FSDataInputStream in, long pos) - throws IOException { - int counter = 0; - try { - if (pos != -1) { - in.seek(pos); - } - this.dataSubIndex = new Tuple[entryNum]; - this.offsetSubIndex = new long[entryNum][]; - - byte[] buf; - for (int i = 0; i < entryNum; i++) { - counter++; - buf = new byte[in.readInt()]; - StorageUtil.readFully(in, buf, 0, buf.length); - dataSubIndex[i] = rowStoreDecoder.toTuple(buf); - - int offsetNum = in.readInt(); - this.offsetSubIndex[i] = new long[offsetNum]; - for (int j = 0; j < offsetNum; j++) { - this.offsetSubIndex[i][j] = in.readLong(); - } - - } - - } catch (IOException e) { - counter--; - if (pos != -1) { - in.seek(pos); - } - this.dataSubIndex = new Tuple[counter]; - this.offsetSubIndex = new long[counter][]; - - byte[] buf; - for (int i = 0; i < counter; i++) { - buf = new byte[in.readInt()]; - StorageUtil.readFully(in, buf, 0, buf.length); - dataSubIndex[i] = rowStoreDecoder.toTuple(buf); - - int offsetNum = in.readInt(); - this.offsetSubIndex[i] = new long[offsetNum]; - for (int j = 0; j < offsetNum; j++) { - this.offsetSubIndex[i][j] = in.readLong(); - } - - } - } - } - - public Tuple getFirstKey() { - return this.firstKey; - } - - public Tuple getLastKey() { - return this.lastKey; - } - - private void fillRootIndex(int entryNum, FSDataInputStream in) - throws IOException { - this.dataIndex = new Tuple[entryNum]; - this.offsetIndex = new long[entryNum]; - Tuple keyTuple; - byte[] buf; - for (int i = 0; i < entryNum; i++) { - buf = new byte[in.readInt()]; - StorageUtil.readFully(in, buf, 0, buf.length); - keyTuple = rowStoreDecoder.toTuple(buf); - dataIndex[i] = keyTuple; - this.offsetIndex[i] = in.readLong(); - } - } - - /* memory index, only one is used. */ - private Tuple[] dataIndex = null; - private Tuple[] dataSubIndex = null; - - /* offset index */ - private long[] offsetIndex = null; - private long[][] offsetSubIndex = null; - - private boolean correctable = true; - - private int oneLevBS(Tuple key) throws IOException { - correctable = true; - int pos = binarySearch(this.dataSubIndex, key, 0, this.dataSubIndex.length); - return pos; - } - - private int twoLevBS(Tuple key, int loadNum) throws IOException { - int pos = binarySearch(this.dataIndex, key, 0, this.dataIndex.length); - if(pos > 0) { - rootCursor = pos; - } else { - rootCursor = 0; - } - fillLeafIndex(loadNum, subIn, this.offsetIndex[rootCursor]); - pos = binarySearch(this.dataSubIndex, key, 0, this.dataSubIndex.length); - - return pos; - } - - private int binarySearch(Tuple[] arr, Tuple key, int startPos, int endPos) { - int offset = -1; - int start = startPos; - int end = endPos; - - //http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=6412541 - int centerPos = (start + end) >>> 1; - while (true) { - if (comparator.compare(arr[centerPos], key) > 0) { - if (centerPos == 0) { - correctable = false; - break; - } else if (comparator.compare(arr[centerPos - 1], key) < 0) { - correctable = false; - offset = centerPos - 1; - break; - } else { - end = centerPos; - centerPos = (start + end) / 2; - } - } else if (comparator.compare(arr[centerPos], key) < 0) { - if (centerPos == arr.length - 1) { - correctable = false; - offset = centerPos; - break; - } else if (comparator.compare(arr[centerPos + 1], key) > 0) { - correctable = false; - offset = centerPos; - break; - } else { - start = centerPos + 1; - centerPos = (start + end) / 2; - } - } else { - correctable = true; - offset = centerPos; - break; - } - } - return offset; - } - - @Override - public void close() throws IOException { - this.indexIn.close(); - this.subIn.close(); - } - - @Override - public String toString() { - return "BSTIndex (" + firstKey + ", " + lastKey + ") " + fileName; - } - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/json/JsonLineDeserializer.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/json/JsonLineDeserializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/json/JsonLineDeserializer.java deleted file mode 100644 index dfe36f6..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/json/JsonLineDeserializer.java +++ /dev/null @@ -1,225 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.json; - - -import io.netty.buffer.ByteBuf; -import net.minidev.json.JSONArray; -import net.minidev.json.JSONObject; -import net.minidev.json.parser.JSONParser; -import net.minidev.json.parser.ParseException; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.SchemaUtil; -import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.common.TajoDataTypes.Type; -import org.apache.tajo.common.exception.NotImplementedException; -import org.apache.tajo.datum.DatumFactory; -import org.apache.tajo.datum.NullDatum; -import org.apache.tajo.datum.TextDatum; -import org.apache.tajo.datum.protobuf.ProtobufJsonFormat; -import org.apache.tajo.storage.Tuple; -import org.apache.tajo.storage.text.TextLineDeserializer; -import org.apache.tajo.storage.text.TextLineParsingError; - -import java.io.IOException; -import java.util.Iterator; - -public class JsonLineDeserializer extends TextLineDeserializer { - private JSONParser parser; - private Type [] types; - private String [] columnNames; - - public JsonLineDeserializer(Schema schema, TableMeta meta, int[] targetColumnIndexes) { - super(schema, meta, targetColumnIndexes); - } - - @Override - public void init() { - types = SchemaUtil.toTypes(schema); - columnNames = SchemaUtil.toSimpleNames(schema); - - parser = new JSONParser(JSONParser.MODE_JSON_SIMPLE); - } - - @Override - public void deserialize(ByteBuf buf, Tuple output) throws IOException, TextLineParsingError { - byte [] line = new byte[buf.readableBytes()]; - buf.readBytes(line); - - try { - JSONObject object = (JSONObject) parser.parse(line); - - for (int i = 0; i < targetColumnIndexes.length; i++) { - int actualIdx = targetColumnIndexes[i]; - String fieldName = columnNames[actualIdx]; - - if (!object.containsKey(fieldName)) { - output.put(actualIdx, NullDatum.get()); - continue; - } - - switch (types[actualIdx]) { - case BOOLEAN: - String boolStr = object.getAsString(fieldName); - if (boolStr != null) { - output.put(actualIdx, DatumFactory.createBool(boolStr.equals("true"))); - } else { - output.put(actualIdx, NullDatum.get()); - } - break; - case CHAR: - String charStr = object.getAsString(fieldName); - if (charStr != null) { - output.put(actualIdx, DatumFactory.createChar(charStr)); - } else { - output.put(actualIdx, NullDatum.get()); - } - break; - case INT1: - case INT2: - Number int2Num = object.getAsNumber(fieldName); - if (int2Num != null) { - output.put(actualIdx, DatumFactory.createInt2(int2Num.shortValue())); - } else { - output.put(actualIdx, NullDatum.get()); - } - break; - case INT4: - Number int4Num = object.getAsNumber(fieldName); - if (int4Num != null) { - output.put(actualIdx, DatumFactory.createInt4(int4Num.intValue())); - } else { - output.put(actualIdx, NullDatum.get()); - } - break; - case INT8: - Number int8Num = object.getAsNumber(fieldName); - if (int8Num != null) { - output.put(actualIdx, DatumFactory.createInt8(int8Num.longValue())); - } else { - output.put(actualIdx, NullDatum.get()); - } - break; - case FLOAT4: - Number float4Num = object.getAsNumber(fieldName); - if (float4Num != null) { - output.put(actualIdx, DatumFactory.createFloat4(float4Num.floatValue())); - } else { - output.put(actualIdx, NullDatum.get()); - } - break; - case FLOAT8: - Number float8Num = object.getAsNumber(fieldName); - if (float8Num != null) { - output.put(actualIdx, DatumFactory.createFloat8(float8Num.doubleValue())); - } else { - output.put(actualIdx, NullDatum.get()); - } - break; - case TEXT: - String textStr = object.getAsString(fieldName); - if (textStr != null) { - output.put(actualIdx, DatumFactory.createText(textStr)); - } else { - output.put(actualIdx, NullDatum.get()); - } - break; - case TIMESTAMP: - String timestampStr = object.getAsString(fieldName); - if (timestampStr != null) { - output.put(actualIdx, DatumFactory.createTimestamp(timestampStr)); - } else { - output.put(actualIdx, NullDatum.get()); - } - break; - case TIME: - String timeStr = object.getAsString(fieldName); - if (timeStr != null) { - output.put(actualIdx, DatumFactory.createTime(timeStr)); - } else { - output.put(actualIdx, NullDatum.get()); - } - break; - case DATE: - String dateStr = object.getAsString(fieldName); - if (dateStr != null) { - output.put(actualIdx, DatumFactory.createDate(dateStr)); - } else { - output.put(actualIdx, NullDatum.get()); - } - break; - case BIT: - case BINARY: - case VARBINARY: - case BLOB: { - Object jsonObject = object.get(fieldName); - - if (jsonObject == null) { - output.put(actualIdx, NullDatum.get()); - break; - } - if (jsonObject instanceof String) { - output.put(actualIdx, DatumFactory.createBlob((String) jsonObject)); - } else if (jsonObject instanceof JSONArray) { - JSONArray jsonArray = (JSONArray) jsonObject; - byte[] bytes = new byte[jsonArray.size()]; - Iterator<Object> it = jsonArray.iterator(); - int arrayIdx = 0; - while (it.hasNext()) { - bytes[arrayIdx++] = ((Long) it.next()).byteValue(); - } - if (bytes.length > 0) { - output.put(actualIdx, DatumFactory.createBlob(bytes)); - } else { - output.put(actualIdx, NullDatum.get()); - } - break; - } else { - throw new IOException("Unknown json object: " + object.getClass().getSimpleName()); - } - break; - } - case INET4: - String inetStr = object.getAsString(fieldName); - if (inetStr != null) { - output.put(actualIdx, DatumFactory.createInet4(inetStr)); - } else { - output.put(actualIdx, NullDatum.get()); - } - break; - - case NULL_TYPE: - output.put(actualIdx, NullDatum.get()); - break; - - default: - throw new NotImplementedException(types[actualIdx].name() + " is not supported."); - } - } - } catch (ParseException pe) { - throw new TextLineParsingError(new String(line, TextDatum.DEFAULT_CHARSET), pe); - } catch (Throwable e) { - throw new IOException(e); - } - } - - @Override - public void release() { - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/json/JsonLineSerDe.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/json/JsonLineSerDe.java b/tajo-storage/src/main/java/org/apache/tajo/storage/json/JsonLineSerDe.java deleted file mode 100644 index 6db2c29..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/json/JsonLineSerDe.java +++ /dev/null @@ -1,37 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.json; - -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.storage.text.TextLineDeserializer; -import org.apache.tajo.storage.text.TextLineSerDe; -import org.apache.tajo.storage.text.TextLineSerializer; - -public class JsonLineSerDe extends TextLineSerDe { - @Override - public TextLineDeserializer createDeserializer(Schema schema, TableMeta meta, int[] targetColumnIndexes) { - return new JsonLineDeserializer(schema, meta, targetColumnIndexes); - } - - @Override - public TextLineSerializer createSerializer(Schema schema, TableMeta meta) { - return new JsonLineSerializer(schema, meta); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/json/JsonLineSerializer.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/json/JsonLineSerializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/json/JsonLineSerializer.java deleted file mode 100644 index c7007d8..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/json/JsonLineSerializer.java +++ /dev/null @@ -1,134 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.json; - - -import net.minidev.json.JSONObject; -import org.apache.commons.lang.CharSet; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.SchemaUtil; -import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.common.TajoDataTypes.Type; -import org.apache.tajo.common.exception.NotImplementedException; -import org.apache.tajo.datum.ProtobufDatum; -import org.apache.tajo.datum.TextDatum; -import org.apache.tajo.datum.protobuf.ProtobufJsonFormat; -import org.apache.tajo.storage.Tuple; -import org.apache.tajo.storage.text.TextLineSerDe; -import org.apache.tajo.storage.text.TextLineSerializer; - -import java.io.IOException; -import java.io.OutputStream; -import java.nio.charset.Charset; - -public class JsonLineSerializer extends TextLineSerializer { - private static ProtobufJsonFormat protobufJsonFormat = ProtobufJsonFormat.getInstance(); - - private Type [] types; - private String [] simpleNames; - private int columnNum; - - - public JsonLineSerializer(Schema schema, TableMeta meta) { - super(schema, meta); - } - - @Override - public void init() { - types = SchemaUtil.toTypes(schema); - simpleNames = SchemaUtil.toSimpleNames(schema); - columnNum = schema.size(); - } - - @Override - public int serialize(OutputStream out, Tuple input) throws IOException { - JSONObject jsonObject = new JSONObject(); - - for (int i = 0; i < columnNum; i++) { - if (input.isNull(i)) { - continue; - } - - String fieldName = simpleNames[i]; - Type type = types[i]; - - switch (type) { - - case BOOLEAN: - jsonObject.put(fieldName, input.getBool(i)); - break; - - case INT1: - case INT2: - jsonObject.put(fieldName, input.getInt2(i)); - break; - - case INT4: - jsonObject.put(fieldName, input.getInt4(i)); - break; - - case INT8: - jsonObject.put(fieldName, input.getInt8(i)); - break; - - case FLOAT4: - jsonObject.put(fieldName, input.getFloat4(i)); - break; - - case FLOAT8: - jsonObject.put(fieldName, input.getFloat8(i)); - break; - - case CHAR: - case TEXT: - case VARCHAR: - case INET4: - case TIMESTAMP: - case DATE: - case TIME: - case INTERVAL: - jsonObject.put(fieldName, input.getText(i)); - break; - - case BIT: - case BINARY: - case BLOB: - case VARBINARY: - jsonObject.put(fieldName, input.getBytes(i)); - break; - - case NULL_TYPE: - break; - - default: - throw new NotImplementedException(types[i].name() + " is not supported."); - } - } - - String jsonStr = jsonObject.toJSONString(); - byte [] jsonBytes = jsonStr.getBytes(TextDatum.DEFAULT_CHARSET); - out.write(jsonBytes); - return jsonBytes.length; - } - - @Override - public void release() { - - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java b/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java deleted file mode 100644 index 3a3bb57..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetAppender.java +++ /dev/null @@ -1,150 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.parquet; - -import org.apache.tajo.storage.StorageConstants; -import parquet.hadoop.ParquetOutputFormat; -import parquet.hadoop.metadata.CompressionCodecName; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.catalog.statistics.TableStats; -import org.apache.tajo.storage.FileAppender; -import org.apache.tajo.storage.TableStatistics; -import org.apache.tajo.storage.Tuple; - -import java.io.IOException; - -/** - * FileAppender for writing to Parquet files. - */ -public class ParquetAppender extends FileAppender { - private TajoParquetWriter writer; - private int blockSize; - private int pageSize; - private CompressionCodecName compressionCodecName; - private boolean enableDictionary; - private boolean validating; - private TableStatistics stats; - - /** - * Creates a new ParquetAppender. - * - * @param conf Configuration properties. - * @param schema The table schema. - * @param meta The table metadata. - * @param path The path of the Parquet file to write to. - */ - public ParquetAppender(Configuration conf, Schema schema, TableMeta meta, - Path path) throws IOException { - super(conf, schema, meta, path); - this.blockSize = Integer.parseInt( - meta.getOption(ParquetOutputFormat.BLOCK_SIZE, StorageConstants.PARQUET_DEFAULT_BLOCK_SIZE)); - this.pageSize = Integer.parseInt( - meta.getOption(ParquetOutputFormat.PAGE_SIZE, StorageConstants.PARQUET_DEFAULT_PAGE_SIZE)); - this.compressionCodecName = CompressionCodecName.fromConf( - meta.getOption(ParquetOutputFormat.COMPRESSION, StorageConstants.PARQUET_DEFAULT_COMPRESSION_CODEC_NAME)); - this.enableDictionary = Boolean.parseBoolean( - meta.getOption(ParquetOutputFormat.ENABLE_DICTIONARY, StorageConstants.PARQUET_DEFAULT_IS_DICTIONARY_ENABLED)); - this.validating = Boolean.parseBoolean( - meta.getOption(ParquetOutputFormat.VALIDATION, StorageConstants.PARQUET_DEFAULT_IS_VALIDATION_ENABLED)); - } - - /** - * Initializes the Appender. This method creates a new TajoParquetWriter - * and initializes the table statistics if enabled. - */ - public void init() throws IOException { - writer = new TajoParquetWriter(path, - schema, - compressionCodecName, - blockSize, - pageSize, - enableDictionary, - validating); - if (enabledStats) { - this.stats = new TableStatistics(schema); - } - super.init(); - } - - /** - * Gets the current offset. Tracking offsets is currenly not implemented, so - * this method always returns 0. - * - * @return 0 - */ - @Override - public long getOffset() throws IOException { - return 0; - } - - /** - * Write a Tuple to the Parquet file. - * - * @param tuple The Tuple to write. - */ - @Override - public void addTuple(Tuple tuple) throws IOException { - if (enabledStats) { - for (int i = 0; i < schema.size(); ++i) { - stats.analyzeField(i, tuple.get(i)); - } - } - writer.write(tuple); - if (enabledStats) { - stats.incrementRow(); - } - } - - /** - * The ParquetWriter does not need to be flushed, so this is a no-op. - */ - @Override - public void flush() throws IOException { - } - - /** - * Closes the Appender. - */ - @Override - public void close() throws IOException { - writer.close(); - } - - public long getEstimatedOutputSize() throws IOException { - return writer.getEstimatedWrittenSize(); - } - - /** - * If table statistics is enabled, retrieve the table statistics. - * - * @return Table statistics if enabled or null otherwise. - */ - @Override - public TableStats getStats() { - if (enabledStats) { - return stats.getTableStat(); - } else { - return null; - } - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetScanner.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetScanner.java b/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetScanner.java deleted file mode 100644 index 36b89b8..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/ParquetScanner.java +++ /dev/null @@ -1,119 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.parquet; - -import org.apache.hadoop.conf.Configuration; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.storage.FileScanner; -import org.apache.tajo.storage.Tuple; -import org.apache.tajo.storage.fragment.FileFragment; - -import java.io.IOException; - -/** - * FileScanner for reading Parquet files - */ -public class ParquetScanner extends FileScanner { - private TajoParquetReader reader; - - /** - * Creates a new ParquetScanner. - * - * @param conf - * @param schema - * @param meta - * @param fragment - */ - public ParquetScanner(Configuration conf, final Schema schema, - final TableMeta meta, final FileFragment fragment) { - super(conf, schema, meta, fragment); - } - - /** - * Initializes the ParquetScanner. This method initializes the - * TajoParquetReader. - */ - @Override - public void init() throws IOException { - if (targets == null) { - targets = schema.toArray(); - } - reader = new TajoParquetReader(fragment.getPath(), schema, new Schema(targets)); - super.init(); - } - - /** - * Reads the next Tuple from the Parquet file. - * - * @return The next Tuple from the Parquet file or null if end of file is - * reached. - */ - @Override - public Tuple next() throws IOException { - return reader.read(); - } - - /** - * Resets the scanner - */ - @Override - public void reset() throws IOException { - } - - /** - * Closes the scanner. - */ - @Override - public void close() throws IOException { - if (reader != null) { - reader.close(); - } - } - - /** - * Returns whether this scanner is projectable. - * - * @return true - */ - @Override - public boolean isProjectable() { - return true; - } - - /** - * Returns whether this scanner is selectable. - * - * @return false - */ - @Override - public boolean isSelectable() { - return false; - } - - /** - * Returns whether this scanner is splittable. - * - * @return false - */ - @Override - public boolean isSplittable() { - return false; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/4561711f/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoParquetReader.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoParquetReader.java b/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoParquetReader.java deleted file mode 100644 index a765f48..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoParquetReader.java +++ /dev/null @@ -1,85 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.parquet; - -import org.apache.hadoop.fs.Path; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.storage.Tuple; -import org.apache.tajo.storage.thirdparty.parquet.ParquetReader; -import parquet.filter.UnboundRecordFilter; - -import java.io.IOException; - -/** - * Tajo implementation of {@link ParquetReader} to read Tajo records from a - * Parquet file. Users should use {@link ParquetScanner} and not this class - * directly. - */ -public class TajoParquetReader extends ParquetReader<Tuple> { - /** - * Creates a new TajoParquetReader. - * - * @param file The file to read from. - * @param readSchema Tajo schema of the table. - */ - public TajoParquetReader(Path file, Schema readSchema) throws IOException { - super(file, new TajoReadSupport(readSchema)); - } - - /** - * Creates a new TajoParquetReader. - * - * @param file The file to read from. - * @param readSchema Tajo schema of the table. - * @param requestedSchema Tajo schema of the projection. - */ - public TajoParquetReader(Path file, Schema readSchema, - Schema requestedSchema) throws IOException { - super(file, new TajoReadSupport(readSchema, requestedSchema)); - } - - /** - * Creates a new TajoParquetReader. - * - * @param file The file to read from. - * @param readSchema Tajo schema of the table. - * @param recordFilter Record filter. - */ - public TajoParquetReader(Path file, Schema readSchema, - UnboundRecordFilter recordFilter) - throws IOException { - super(file, new TajoReadSupport(readSchema), recordFilter); - } - - /** - * Creates a new TajoParquetReader. - * - * @param file The file to read from. - * @param readSchema Tajo schema of the table. - * @param requestedSchema Tajo schema of the projection. - * @param recordFilter Record filter. - */ - public TajoParquetReader(Path file, Schema readSchema, - Schema requestedSchema, - UnboundRecordFilter recordFilter) - throws IOException { - super(file, new TajoReadSupport(readSchema, requestedSchema), - recordFilter); - } -}
