http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/Scanner.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/Scanner.java b/tajo-storage/src/main/java/org/apache/tajo/storage/Scanner.java deleted file mode 100644 index 16c4faa..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/Scanner.java +++ /dev/null @@ -1,103 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -import org.apache.tajo.catalog.Column; -import org.apache.tajo.catalog.SchemaObject; -import org.apache.tajo.catalog.statistics.TableStats; - -import java.io.Closeable; -import java.io.IOException; - -/** - * Scanner Interface - */ -public interface Scanner extends SchemaObject, Closeable { - - void init() throws IOException; - - /** - * It returns one tuple at each call. - * - * @return retrieve null if the scanner has no more tuples. - * Otherwise it returns one tuple. - * - * @throws IOException if internal I/O error occurs during next method - */ - Tuple next() throws IOException; - - /** - * Reset the cursor. After executed, the scanner - * will retrieve the first tuple. - * - * @throws IOException if internal I/O error occurs during reset method - */ - void reset() throws IOException; - - /** - * Close scanner - * - * @throws IOException if internal I/O error occurs during close method - */ - void close() throws IOException; - - - /** - * It returns if the projection is executed in the underlying scanner layer. - * - * @return true if this scanner can project the given columns. - */ - boolean isProjectable(); - - /** - * Set target columns - * @param targets columns to be projected - */ - void setTarget(Column [] targets); - - /** - * It returns if the selection is executed in the underlying scanner layer. - * - * @return true if this scanner can filter tuples against a given condition. - */ - boolean isSelectable(); - - /** - * Set a search condition - * @param expr to be searched - * - * TODO - to be changed Object type - */ - void setSearchCondition(Object expr); - - /** - * It returns if the file is splittable. - * - * @return true if this scanner can split the a file. - */ - boolean isSplittable(); - - /** - * How much of the input has the Scanner consumed - * @return progress from <code>0.0</code> to <code>1.0</code>. - */ - float getProgress(); - - TableStats getInputStats(); -}
http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/SeekableScanner.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/SeekableScanner.java b/tajo-storage/src/main/java/org/apache/tajo/storage/SeekableScanner.java deleted file mode 100644 index 894e7ee..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/SeekableScanner.java +++ /dev/null @@ -1,28 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -import java.io.IOException; - -public interface SeekableScanner extends Scanner { - - public abstract long getNextOffset() throws IOException; - - public abstract void seek(long offset) throws IOException; -} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/SerializerDeserializer.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/SerializerDeserializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/SerializerDeserializer.java deleted file mode 100644 index 564a9f5..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/SerializerDeserializer.java +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -import org.apache.tajo.catalog.Column; -import org.apache.tajo.datum.Datum; - -import java.io.IOException; -import java.io.OutputStream; - -@Deprecated -public interface SerializerDeserializer { - - public int serialize(Column col, Datum datum, OutputStream out, byte[] nullCharacters) throws IOException; - - public Datum deserialize(Column col, byte[] bytes, int offset, int length, byte[] nullCharacters) throws IOException; - -} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/SplitLineReader.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/SplitLineReader.java b/tajo-storage/src/main/java/org/apache/tajo/storage/SplitLineReader.java deleted file mode 100644 index 3579674..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/SplitLineReader.java +++ /dev/null @@ -1,39 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -import org.apache.hadoop.conf.Configuration; - -import java.io.IOException; -import java.io.InputStream; - -public class SplitLineReader extends LineReader { - public SplitLineReader(InputStream in, byte[] recordDelimiterBytes) { - super(in, recordDelimiterBytes); - } - - public SplitLineReader(InputStream in, Configuration conf, - byte[] recordDelimiterBytes) throws IOException { - super(in, conf, recordDelimiterBytes); - } - - public boolean needAdditionalRecordAfterSplit() { - return false; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/Storage.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/Storage.java b/tajo-storage/src/main/java/org/apache/tajo/storage/Storage.java deleted file mode 100644 index cc85c1d..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/Storage.java +++ /dev/null @@ -1,45 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.storage.fragment.FileFragment; - -import java.io.IOException; - -public abstract class Storage { - protected final Configuration conf; - - public Storage(final Configuration conf) { - this.conf = conf; - } - - public Configuration getConf() { - return this.conf; - } - - public abstract Appender getAppender(TableMeta meta, Path path) - throws IOException; - - public abstract Scanner openScanner(Schema schema, FileFragment[] tablets) - throws IOException; -} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/StorageManager.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/StorageManager.java b/tajo-storage/src/main/java/org/apache/tajo/storage/StorageManager.java deleted file mode 100644 index e37be58..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/StorageManager.java +++ /dev/null @@ -1,926 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -import com.google.common.collect.Maps; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.*; -import org.apache.tajo.*; -import org.apache.tajo.catalog.*; -import org.apache.tajo.catalog.proto.CatalogProtos; -import org.apache.tajo.catalog.proto.CatalogProtos.FragmentProto; -import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; -import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.conf.TajoConf.ConfVars; -import org.apache.tajo.plan.LogicalPlan; -import org.apache.tajo.plan.logical.*; -import org.apache.tajo.plan.rewrite.RewriteRule; -import org.apache.tajo.storage.fragment.FileFragment; -import org.apache.tajo.storage.fragment.Fragment; -import org.apache.tajo.storage.fragment.FragmentConvertor; -import org.apache.tajo.storage.hbase.HBaseStorageManager; -import org.apache.tajo.util.TUtil; - -import java.io.IOException; -import java.lang.reflect.Constructor; -import java.net.URI; -import java.text.NumberFormat; -import java.util.*; -import java.util.concurrent.ConcurrentHashMap; - -/** - * StorageManager manages the functions of storing and reading data. - * StorageManager is a abstract class. - * For supporting such as HDFS, HBASE, a specific StorageManager should be implemented by inheriting this class. - * - */ -public abstract class StorageManager { - private final Log LOG = LogFactory.getLog(StorageManager.class); - - private static final Class<?>[] DEFAULT_SCANNER_PARAMS = { - Configuration.class, - Schema.class, - TableMeta.class, - Fragment.class - }; - - private static final Class<?>[] DEFAULT_APPENDER_PARAMS = { - Configuration.class, - QueryUnitAttemptId.class, - Schema.class, - TableMeta.class, - Path.class - }; - - protected TajoConf conf; - protected StoreType storeType; - - /** - * Cache of StorageManager. - * Key is manager key(warehouse path) + store type - */ - private static final Map<String, StorageManager> storageManagers = Maps.newHashMap(); - - /** - * Cache of scanner handlers for each storage type. - */ - protected static final Map<String, Class<? extends Scanner>> SCANNER_HANDLER_CACHE - = new ConcurrentHashMap<String, Class<? extends Scanner>>(); - - /** - * Cache of appender handlers for each storage type. - */ - protected static final Map<String, Class<? extends Appender>> APPENDER_HANDLER_CACHE - = new ConcurrentHashMap<String, Class<? extends Appender>>(); - - /** - * Cache of constructors for each class. Pins the classes so they - * can't be garbage collected until ReflectionUtils can be collected. - */ - private static final Map<Class<?>, Constructor<?>> CONSTRUCTOR_CACHE = - new ConcurrentHashMap<Class<?>, Constructor<?>>(); - - public StorageManager(StoreType storeType) { - this.storeType = storeType; - } - - /** - * Initialize storage manager. - * @throws IOException - */ - protected abstract void storageInit() throws IOException; - - /** - * This method is called after executing "CREATE TABLE" statement. - * If a storage is a file based storage, a storage manager may create directory. - * - * @param tableDesc Table description which is created. - * @param ifNotExists Creates the table only when the table does not exist. - * @throws IOException - */ - public abstract void createTable(TableDesc tableDesc, boolean ifNotExists) throws IOException; - - /** - * This method is called after executing "DROP TABLE" statement with the 'PURGE' option - * which is the option to delete all the data. - * - * @param tableDesc - * @throws IOException - */ - public abstract void purgeTable(TableDesc tableDesc) throws IOException; - - /** - * Returns the splits that will serve as input for the scan tasks. The - * number of splits matches the number of regions in a table. - * @param fragmentId The table name or previous ExecutionBlockId - * @param tableDesc The table description for the target data. - * @param scanNode The logical node for scanning. - * @return The list of input fragments. - * @throws IOException - */ - public abstract List<Fragment> getSplits(String fragmentId, TableDesc tableDesc, - ScanNode scanNode) throws IOException; - - /** - * It returns the splits that will serve as input for the non-forward query scanner such as 'select * from table1'. - * The result list should be small. If there is many fragments for scanning, TajoMaster uses the paging navigation. - * @param tableDesc The table description for the target data. - * @param currentPage The current page number within the entire list. - * @param numFragments The number of fragments in the result. - * @return The list of input fragments. - * @throws IOException - */ - public abstract List<Fragment> getNonForwardSplit(TableDesc tableDesc, int currentPage, int numFragments) - throws IOException; - - /** - * It returns the storage property. - * @return The storage property - */ - public abstract StorageProperty getStorageProperty(); - - /** - * Release storage manager resource - */ - public abstract void closeStorageManager(); - - /** - * It is called by a Repartitioner for range shuffling when the SortRangeType of SortNode is STORAGE_SPECIFIED. - * In general Repartitioner determines the partition range using previous output statistics data. - * In the special cases, such as HBase Repartitioner uses the result of this method. - * - * @param queryContext The current query context which contains query properties. - * @param tableDesc The table description for the target data. - * @param inputSchema The input schema - * @param sortSpecs The sort specification that contains the sort column and sort order. - * @return The list of sort ranges. - * @throws IOException - */ - public abstract TupleRange[] getInsertSortRanges(OverridableConf queryContext, TableDesc tableDesc, - Schema inputSchema, SortSpec[] sortSpecs, - TupleRange dataRange) throws IOException; - - /** - * This method is called before executing 'INSERT' or 'CREATE TABLE as SELECT'. - * In general Tajo creates the target table after finishing the final sub-query of CATS. - * But In the special cases, such as HBase INSERT or CAST query uses the target table information. - * That kind of the storage should implements the logic related to creating table in this method. - * - * @param node The child node of the root node. - * @throws IOException - */ - public abstract void beforeInsertOrCATS(LogicalNode node) throws IOException; - - /** - * It is called when the query failed. - * Each storage manager should implement to be processed when the query fails in this method. - * - * @param node The child node of the root node. - * @throws IOException - */ - public abstract void rollbackOutputCommit(LogicalNode node) throws IOException; - - /** - * Returns the current storage type. - * @return - */ - public StoreType getStoreType() { - return storeType; - } - - /** - * Initialize StorageManager instance. It should be called before using. - * - * @param tajoConf - * @throws IOException - */ - public void init(TajoConf tajoConf) throws IOException { - this.conf = tajoConf; - storageInit(); - } - - /** - * Close StorageManager - * @throws IOException - */ - public void close() throws IOException { - synchronized(storageManagers) { - for (StorageManager eachStorageManager: storageManagers.values()) { - eachStorageManager.closeStorageManager(); - } - } - } - - /** - * Returns the splits that will serve as input for the scan tasks. The - * number of splits matches the number of regions in a table. - * - * @param fragmentId The table name or previous ExecutionBlockId - * @param tableDesc The table description for the target data. - * @return The list of input fragments. - * @throws IOException - */ - public List<Fragment> getSplits(String fragmentId, TableDesc tableDesc) throws IOException { - return getSplits(fragmentId, tableDesc, null); - } - - /** - * Returns FileStorageManager instance. - * - * @param tajoConf Tajo system property. - * @return - * @throws IOException - */ - public static FileStorageManager getFileStorageManager(TajoConf tajoConf) throws IOException { - return getFileStorageManager(tajoConf, null); - } - - /** - * Returns FileStorageManager instance and sets WAREHOUSE_DIR property in tajoConf with warehousePath parameter. - * - * @param tajoConf Tajo system property. - * @param warehousePath The warehouse directory to be set in the tajoConf. - * @return - * @throws IOException - */ - public static FileStorageManager getFileStorageManager(TajoConf tajoConf, Path warehousePath) throws IOException { - URI uri; - TajoConf copiedConf = new TajoConf(tajoConf); - if (warehousePath != null) { - copiedConf.setVar(ConfVars.WAREHOUSE_DIR, warehousePath.toUri().toString()); - } - uri = TajoConf.getWarehouseDir(copiedConf).toUri(); - String key = "file".equals(uri.getScheme()) ? "file" : uri.toString(); - return (FileStorageManager) getStorageManager(copiedConf, StoreType.CSV, key); - } - - /** - * Returns the proper StorageManager instance according to the storeType. - * - * @param tajoConf Tajo system property. - * @param storeType Storage type - * @return - * @throws IOException - */ - public static StorageManager getStorageManager(TajoConf tajoConf, String storeType) throws IOException { - if ("HBASE".equals(storeType)) { - return getStorageManager(tajoConf, StoreType.HBASE); - } else { - return getStorageManager(tajoConf, StoreType.CSV); - } - } - - /** - * Returns the proper StorageManager instance according to the storeType. - * - * @param tajoConf Tajo system property. - * @param storeType Storage type - * @return - * @throws IOException - */ - public static StorageManager getStorageManager(TajoConf tajoConf, StoreType storeType) throws IOException { - return getStorageManager(tajoConf, storeType, null); - } - - /** - * Returns the proper StorageManager instance according to the storeType - * - * @param tajoConf Tajo system property. - * @param storeType Storage type - * @param managerKey Key that can identify each storage manager(may be a path) - * @return - * @throws IOException - */ - public static synchronized StorageManager getStorageManager ( - TajoConf tajoConf, StoreType storeType, String managerKey) throws IOException { - synchronized (storageManagers) { - String storeKey = storeType + managerKey; - StorageManager manager = storageManagers.get(storeKey); - if (manager == null) { - switch (storeType) { - case HBASE: - manager = new HBaseStorageManager(storeType); - break; - default: - manager = new FileStorageManager(storeType); - } - - manager.init(tajoConf); - storageManagers.put(storeKey, manager); - } - - return manager; - } - } - - /** - * Returns Scanner instance. - * - * @param meta The table meta - * @param schema The input schema - * @param fragment The fragment for scanning - * @param target Columns which are selected. - * @return Scanner instance - * @throws IOException - */ - public Scanner getScanner(TableMeta meta, Schema schema, FragmentProto fragment, Schema target) throws IOException { - return getScanner(meta, schema, FragmentConvertor.convert(conf, fragment), target); - } - - /** - * Returns Scanner instance. - * - * @param meta The table meta - * @param schema The input schema - * @param fragment The fragment for scanning - * @return Scanner instance - * @throws IOException - */ - public Scanner getScanner(TableMeta meta, Schema schema, Fragment fragment) throws IOException { - return getScanner(meta, schema, fragment, schema); - } - - /** - * Returns Scanner instance. - * - * @param meta The table meta - * @param schema The input schema - * @param fragment The fragment for scanning - * @param target The output schema - * @return Scanner instance - * @throws IOException - */ - public Scanner getScanner(TableMeta meta, Schema schema, Fragment fragment, Schema target) throws IOException { - if (fragment.isEmpty()) { - Scanner scanner = new NullScanner(conf, schema, meta, fragment); - scanner.setTarget(target.toArray()); - - return scanner; - } - - Scanner scanner; - - Class<? extends Scanner> scannerClass = getScannerClass(meta.getStoreType()); - scanner = newScannerInstance(scannerClass, conf, schema, meta, fragment); - if (scanner.isProjectable()) { - scanner.setTarget(target.toArray()); - } - - return scanner; - } - - /** - * Returns Scanner instance. - * - * @param conf The system property - * @param meta The table meta - * @param schema The input schema - * @param fragment The fragment for scanning - * @param target The output schema - * @return Scanner instance - * @throws IOException - */ - public static synchronized SeekableScanner getSeekableScanner( - TajoConf conf, TableMeta meta, Schema schema, FileFragment fragment, Schema target) throws IOException { - return (SeekableScanner)getStorageManager(conf, meta.getStoreType()).getScanner(meta, schema, fragment, target); - } - - /** - * Returns Scanner instance. - * - * @param conf The system property - * @param meta The table meta - * @param schema The input schema - * @param path The data file path - * @return Scanner instance - * @throws IOException - */ - public static synchronized SeekableScanner getSeekableScanner( - TajoConf conf, TableMeta meta, Schema schema, Path path) throws IOException { - - FileSystem fs = path.getFileSystem(conf); - FileStatus status = fs.getFileStatus(path); - FileFragment fragment = new FileFragment(path.getName(), path, 0, status.getLen()); - - return getSeekableScanner(conf, meta, schema, fragment, schema); - } - - /** - * Returns Appender instance. - * @param queryContext Query property. - * @param taskAttemptId Task id. - * @param meta Table meta data. - * @param schema Output schema. - * @param workDir Working directory - * @return Appender instance - * @throws IOException - */ - public Appender getAppender(OverridableConf queryContext, - QueryUnitAttemptId taskAttemptId, TableMeta meta, Schema schema, Path workDir) - throws IOException { - Appender appender; - - Class<? extends Appender> appenderClass; - - String handlerName = meta.getStoreType().name().toLowerCase(); - appenderClass = APPENDER_HANDLER_CACHE.get(handlerName); - if (appenderClass == null) { - appenderClass = conf.getClass( - String.format("tajo.storage.appender-handler.%s.class", - meta.getStoreType().name().toLowerCase()), null, Appender.class); - APPENDER_HANDLER_CACHE.put(handlerName, appenderClass); - } - - if (appenderClass == null) { - throw new IOException("Unknown Storage Type: " + meta.getStoreType()); - } - - appender = newAppenderInstance(appenderClass, conf, taskAttemptId, meta, schema, workDir); - - return appender; - } - - /** - * Creates a scanner instance. - * - * @param theClass Concrete class of scanner - * @param conf System property - * @param schema Input schema - * @param meta Table meta data - * @param fragment The fragment for scanning - * @param <T> - * @return The scanner instance - */ - public static <T> T newScannerInstance(Class<T> theClass, Configuration conf, Schema schema, TableMeta meta, - Fragment fragment) { - T result; - try { - Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass); - if (meth == null) { - meth = theClass.getDeclaredConstructor(DEFAULT_SCANNER_PARAMS); - meth.setAccessible(true); - CONSTRUCTOR_CACHE.put(theClass, meth); - } - result = meth.newInstance(new Object[]{conf, schema, meta, fragment}); - } catch (Exception e) { - throw new RuntimeException(e); - } - - return result; - } - - /** - * Creates a scanner instance. - * - * @param theClass Concrete class of scanner - * @param conf System property - * @param taskAttemptId Task id - * @param meta Table meta data - * @param schema Input schema - * @param workDir Working directory - * @param <T> - * @return The scanner instance - */ - public static <T> T newAppenderInstance(Class<T> theClass, Configuration conf, QueryUnitAttemptId taskAttemptId, - TableMeta meta, Schema schema, Path workDir) { - T result; - try { - Constructor<T> meth = (Constructor<T>) CONSTRUCTOR_CACHE.get(theClass); - if (meth == null) { - meth = theClass.getDeclaredConstructor(DEFAULT_APPENDER_PARAMS); - meth.setAccessible(true); - CONSTRUCTOR_CACHE.put(theClass, meth); - } - result = meth.newInstance(new Object[]{conf, taskAttemptId, schema, meta, workDir}); - } catch (Exception e) { - throw new RuntimeException(e); - } - - return result; - } - - /** - * Return the Scanner class for the StoreType that is defined in storage-default.xml. - * - * @param storeType store type - * @return The Scanner class - * @throws IOException - */ - public Class<? extends Scanner> getScannerClass(CatalogProtos.StoreType storeType) throws IOException { - String handlerName = storeType.name().toLowerCase(); - Class<? extends Scanner> scannerClass = SCANNER_HANDLER_CACHE.get(handlerName); - if (scannerClass == null) { - scannerClass = conf.getClass( - String.format("tajo.storage.scanner-handler.%s.class",storeType.name().toLowerCase()), null, Scanner.class); - SCANNER_HANDLER_CACHE.put(handlerName, scannerClass); - } - - if (scannerClass == null) { - throw new IOException("Unknown Storage Type: " + storeType.name()); - } - - return scannerClass; - } - - /** - * Return length of the fragment. - * In the UNKNOWN_LENGTH case get FRAGMENT_ALTERNATIVE_UNKNOWN_LENGTH from the configuration. - * - * @param conf Tajo system property - * @param fragment Fragment - * @return - */ - public static long getFragmentLength(TajoConf conf, Fragment fragment) { - if (fragment.getLength() == TajoConstants.UNKNOWN_LENGTH) { - return conf.getLongVar(ConfVars.FRAGMENT_ALTERNATIVE_UNKNOWN_LENGTH); - } else { - return fragment.getLength(); - } - } - - /** - * It is called after making logical plan. Storage manager should verify the schema for inserting. - * - * @param tableDesc The table description of insert target. - * @param outSchema The output schema of select query for inserting. - * @throws IOException - */ - public void verifyInsertTableSchema(TableDesc tableDesc, Schema outSchema) throws IOException { - // nothing to do - } - - /** - * Returns the list of storage specified rewrite rules. - * This values are used by LogicalOptimizer. - * - * @param queryContext The query property - * @param tableDesc The description of the target table. - * @return The list of storage specified rewrite rules - * @throws IOException - */ - public List<RewriteRule> getRewriteRules(OverridableConf queryContext, TableDesc tableDesc) throws IOException { - return null; - } - - /** - * Finalizes result data. Tajo stores result data in the staging directory. - * If the query fails, clean up the staging directory. - * Otherwise the query is successful, move to the final directory from the staging directory. - * - * @param queryContext The query property - * @param finalEbId The final execution block id - * @param plan The query plan - * @param schema The final output schema - * @param tableDesc The description of the target table - * @return Saved path - * @throws IOException - */ - public Path commitOutputData(OverridableConf queryContext, ExecutionBlockId finalEbId, - LogicalPlan plan, Schema schema, - TableDesc tableDesc) throws IOException { - return commitOutputData(queryContext, finalEbId, plan, schema, tableDesc, true); - } - - /** - * Finalizes result data. Tajo stores result data in the staging directory. - * If the query fails, clean up the staging directory. - * Otherwise the query is successful, move to the final directory from the staging directory. - * - * @param queryContext The query property - * @param finalEbId The final execution block id - * @param plan The query plan - * @param schema The final output schema - * @param tableDesc The description of the target table - * @param changeFileSeq If true change result file name with max sequence. - * @return Saved path - * @throws IOException - */ - protected Path commitOutputData(OverridableConf queryContext, ExecutionBlockId finalEbId, - LogicalPlan plan, Schema schema, - TableDesc tableDesc, boolean changeFileSeq) throws IOException { - Path stagingDir = new Path(queryContext.get(QueryVars.STAGING_DIR)); - Path stagingResultDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME); - Path finalOutputDir; - if (!queryContext.get(QueryVars.OUTPUT_TABLE_PATH, "").isEmpty()) { - finalOutputDir = new Path(queryContext.get(QueryVars.OUTPUT_TABLE_PATH)); - FileSystem fs = stagingResultDir.getFileSystem(conf); - - if (queryContext.getBool(QueryVars.OUTPUT_OVERWRITE, false)) { // INSERT OVERWRITE INTO - - // It moves the original table into the temporary location. - // Then it moves the new result table into the original table location. - // Upon failed, it recovers the original table if possible. - boolean movedToOldTable = false; - boolean committed = false; - Path oldTableDir = new Path(stagingDir, TajoConstants.INSERT_OVERWIRTE_OLD_TABLE_NAME); - - if (!queryContext.get(QueryVars.OUTPUT_PARTITIONS, "").isEmpty()) { - // This is a map for existing non-leaf directory to rename. A key is current directory and a value is - // renaming directory. - Map<Path, Path> renameDirs = TUtil.newHashMap(); - // This is a map for recovering existing partition directory. A key is current directory and a value is - // temporary directory to back up. - Map<Path, Path> recoveryDirs = TUtil.newHashMap(); - - try { - if (!fs.exists(finalOutputDir)) { - fs.mkdirs(finalOutputDir); - } - - visitPartitionedDirectory(fs, stagingResultDir, finalOutputDir, stagingResultDir.toString(), - renameDirs, oldTableDir); - - // Rename target partition directories - for(Map.Entry<Path, Path> entry : renameDirs.entrySet()) { - // Backup existing data files for recovering - if (fs.exists(entry.getValue())) { - String recoveryPathString = entry.getValue().toString().replaceAll(finalOutputDir.toString(), - oldTableDir.toString()); - Path recoveryPath = new Path(recoveryPathString); - fs.rename(entry.getValue(), recoveryPath); - fs.exists(recoveryPath); - recoveryDirs.put(entry.getValue(), recoveryPath); - } - // Delete existing directory - fs.delete(entry.getValue(), true); - // Rename staging directory to final output directory - fs.rename(entry.getKey(), entry.getValue()); - } - - } catch (IOException ioe) { - // Remove created dirs - for(Map.Entry<Path, Path> entry : renameDirs.entrySet()) { - fs.delete(entry.getValue(), true); - } - - // Recovery renamed dirs - for(Map.Entry<Path, Path> entry : recoveryDirs.entrySet()) { - fs.delete(entry.getValue(), true); - fs.rename(entry.getValue(), entry.getKey()); - } - throw new IOException(ioe.getMessage()); - } - } else { - try { - if (fs.exists(finalOutputDir)) { - fs.rename(finalOutputDir, oldTableDir); - movedToOldTable = fs.exists(oldTableDir); - } else { // if the parent does not exist, make its parent directory. - fs.mkdirs(finalOutputDir.getParent()); - } - - fs.rename(stagingResultDir, finalOutputDir); - committed = fs.exists(finalOutputDir); - } catch (IOException ioe) { - // recover the old table - if (movedToOldTable && !committed) { - fs.rename(oldTableDir, finalOutputDir); - } - } - } - } else { - String queryType = queryContext.get(QueryVars.COMMAND_TYPE); - - if (queryType != null && queryType.equals(NodeType.INSERT.name())) { // INSERT INTO an existing table - - NumberFormat fmt = NumberFormat.getInstance(); - fmt.setGroupingUsed(false); - fmt.setMinimumIntegerDigits(3); - - if (!queryContext.get(QueryVars.OUTPUT_PARTITIONS, "").isEmpty()) { - for(FileStatus eachFile: fs.listStatus(stagingResultDir)) { - if (eachFile.isFile()) { - LOG.warn("Partition table can't have file in a staging dir: " + eachFile.getPath()); - continue; - } - moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputDir, fmt, -1, changeFileSeq); - } - } else { - int maxSeq = StorageUtil.getMaxFileSequence(fs, finalOutputDir, false) + 1; - for(FileStatus eachFile: fs.listStatus(stagingResultDir)) { - if (eachFile.getPath().getName().startsWith("_")) { - continue; - } - moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputDir, fmt, maxSeq++, changeFileSeq); - } - } - // checking all file moved and remove empty dir - verifyAllFileMoved(fs, stagingResultDir); - FileStatus[] files = fs.listStatus(stagingResultDir); - if (files != null && files.length != 0) { - for (FileStatus eachFile: files) { - LOG.error("There are some unmoved files in staging dir:" + eachFile.getPath()); - } - } - } else { // CREATE TABLE AS SELECT (CTAS) - fs.rename(stagingResultDir, finalOutputDir); - LOG.info("Moved from the staging dir to the output directory '" + finalOutputDir); - } - } - } else { - finalOutputDir = new Path(stagingDir, TajoConstants.RESULT_DIR_NAME); - } - - return finalOutputDir; - } - - /** - * Attach the sequence number to the output file name and than move the file into the final result path. - * - * @param fs FileSystem - * @param stagingResultDir The staging result dir - * @param fileStatus The file status - * @param finalOutputPath Final output path - * @param nf Number format - * @param fileSeq The sequence number - * @throws IOException - */ - private void moveResultFromStageToFinal(FileSystem fs, Path stagingResultDir, - FileStatus fileStatus, Path finalOutputPath, - NumberFormat nf, - int fileSeq, boolean changeFileSeq) throws IOException { - if (fileStatus.isDirectory()) { - String subPath = extractSubPath(stagingResultDir, fileStatus.getPath()); - if (subPath != null) { - Path finalSubPath = new Path(finalOutputPath, subPath); - if (!fs.exists(finalSubPath)) { - fs.mkdirs(finalSubPath); - } - int maxSeq = StorageUtil.getMaxFileSequence(fs, finalSubPath, false); - for (FileStatus eachFile : fs.listStatus(fileStatus.getPath())) { - if (eachFile.getPath().getName().startsWith("_")) { - continue; - } - moveResultFromStageToFinal(fs, stagingResultDir, eachFile, finalOutputPath, nf, ++maxSeq, changeFileSeq); - } - } else { - throw new IOException("Wrong staging dir:" + stagingResultDir + "," + fileStatus.getPath()); - } - } else { - String subPath = extractSubPath(stagingResultDir, fileStatus.getPath()); - if (subPath != null) { - Path finalSubPath = new Path(finalOutputPath, subPath); - if (changeFileSeq) { - finalSubPath = new Path(finalSubPath.getParent(), replaceFileNameSeq(finalSubPath, fileSeq, nf)); - } - if (!fs.exists(finalSubPath.getParent())) { - fs.mkdirs(finalSubPath.getParent()); - } - if (fs.exists(finalSubPath)) { - throw new IOException("Already exists data file:" + finalSubPath); - } - boolean success = fs.rename(fileStatus.getPath(), finalSubPath); - if (success) { - LOG.info("Moving staging file[" + fileStatus.getPath() + "] + " + - "to final output[" + finalSubPath + "]"); - } else { - LOG.error("Can't move staging file[" + fileStatus.getPath() + "] + " + - "to final output[" + finalSubPath + "]"); - } - } - } - } - - /** - * Removes the path of the parent. - * @param parentPath - * @param childPath - * @return - */ - private String extractSubPath(Path parentPath, Path childPath) { - String parentPathStr = parentPath.toUri().getPath(); - String childPathStr = childPath.toUri().getPath(); - - if (parentPathStr.length() > childPathStr.length()) { - return null; - } - - int index = childPathStr.indexOf(parentPathStr); - if (index != 0) { - return null; - } - - return childPathStr.substring(parentPathStr.length() + 1); - } - - /** - * Attach the sequence number to a path. - * - * @param path Path - * @param seq sequence number - * @param nf Number format - * @return New path attached with sequence number - * @throws IOException - */ - private String replaceFileNameSeq(Path path, int seq, NumberFormat nf) throws IOException { - String[] tokens = path.getName().split("-"); - if (tokens.length != 4) { - throw new IOException("Wrong result file name:" + path); - } - return tokens[0] + "-" + tokens[1] + "-" + tokens[2] + "-" + nf.format(seq); - } - - /** - * Make sure all files are moved. - * @param fs FileSystem - * @param stagingPath The stagind directory - * @return - * @throws IOException - */ - private boolean verifyAllFileMoved(FileSystem fs, Path stagingPath) throws IOException { - FileStatus[] files = fs.listStatus(stagingPath); - if (files != null && files.length != 0) { - for (FileStatus eachFile: files) { - if (eachFile.isFile()) { - LOG.error("There are some unmoved files in staging dir:" + eachFile.getPath()); - return false; - } else { - if (verifyAllFileMoved(fs, eachFile.getPath())) { - fs.delete(eachFile.getPath(), false); - } else { - return false; - } - } - } - } - - return true; - } - - /** - * This method sets a rename map which includes renamed staging directory to final output directory recursively. - * If there exists some data files, this delete it for duplicate data. - * - * - * @param fs - * @param stagingPath - * @param outputPath - * @param stagingParentPathString - * @throws IOException - */ - private void visitPartitionedDirectory(FileSystem fs, Path stagingPath, Path outputPath, - String stagingParentPathString, - Map<Path, Path> renameDirs, Path oldTableDir) throws IOException { - FileStatus[] files = fs.listStatus(stagingPath); - - for(FileStatus eachFile : files) { - if (eachFile.isDirectory()) { - Path oldPath = eachFile.getPath(); - - // Make recover directory. - String recoverPathString = oldPath.toString().replaceAll(stagingParentPathString, - oldTableDir.toString()); - Path recoveryPath = new Path(recoverPathString); - if (!fs.exists(recoveryPath)) { - fs.mkdirs(recoveryPath); - } - - visitPartitionedDirectory(fs, eachFile.getPath(), outputPath, stagingParentPathString, - renameDirs, oldTableDir); - // Find last order partition for renaming - String newPathString = oldPath.toString().replaceAll(stagingParentPathString, - outputPath.toString()); - Path newPath = new Path(newPathString); - if (!isLeafDirectory(fs, eachFile.getPath())) { - renameDirs.put(eachFile.getPath(), newPath); - } else { - if (!fs.exists(newPath)) { - fs.mkdirs(newPath); - } - } - } - } - } - - private boolean isLeafDirectory(FileSystem fs, Path path) throws IOException { - boolean retValue = false; - - FileStatus[] files = fs.listStatus(path); - for (FileStatus file : files) { - if (fs.isDirectory(file.getPath())) { - retValue = true; - break; - } - } - - return retValue; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/StorageProperty.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/StorageProperty.java b/tajo-storage/src/main/java/org/apache/tajo/storage/StorageProperty.java deleted file mode 100644 index 6816d08..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/StorageProperty.java +++ /dev/null @@ -1,40 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -public class StorageProperty { - private boolean supportsInsertInto; - private boolean sortedInsert; - - public boolean isSupportsInsertInto() { - return supportsInsertInto; - } - - public void setSupportsInsertInto(boolean supportsInsertInto) { - this.supportsInsertInto = supportsInsertInto; - } - - public boolean isSortedInsert() { - return sortedInsert; - } - - public void setSortedInsert(boolean sortedInsert) { - this.sortedInsert = sortedInsert; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/StorageUtil.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/StorageUtil.java b/tajo-storage/src/main/java/org/apache/tajo/storage/StorageUtil.java deleted file mode 100644 index 4a66678..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/StorageUtil.java +++ /dev/null @@ -1,248 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.tajo.catalog.Column; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.catalog.proto.CatalogProtos; -import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; -import org.apache.tajo.util.FileUtil; -import org.apache.tajo.util.KeyValueSet; -import parquet.hadoop.ParquetOutputFormat; -import sun.nio.ch.DirectBuffer; - -import java.io.DataInput; -import java.io.EOFException; -import java.io.IOException; -import java.io.InputStream; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.List; - -public class StorageUtil extends StorageConstants { - public static int getRowByteSize(Schema schema) { - int sum = 0; - for(Column col : schema.getColumns()) { - sum += StorageUtil.getColByteSize(col); - } - - return sum; - } - - public static int getColByteSize(Column col) { - switch (col.getDataType().getType()) { - case BOOLEAN: - return 1; - case CHAR: - return 1; - case BIT: - return 1; - case INT2: - return 2; - case INT4: - return 4; - case INT8: - return 8; - case FLOAT4: - return 4; - case FLOAT8: - return 8; - case INET4: - return 4; - case INET6: - return 32; - case TEXT: - return 256; - case BLOB: - return 256; - case DATE: - return 4; - case TIME: - return 8; - case TIMESTAMP: - return 8; - default: - return 0; - } - } - - public static void writeTableMeta(Configuration conf, Path tableroot, TableMeta meta) throws IOException { - FileSystem fs = tableroot.getFileSystem(conf); - FSDataOutputStream out = fs.create(new Path(tableroot, ".meta")); - FileUtil.writeProto(out, meta.getProto()); - out.flush(); - out.close(); - } - - public static Path concatPath(String parent, String...childs) { - return concatPath(new Path(parent), childs); - } - - public static Path concatPath(Path parent, String...childs) { - StringBuilder sb = new StringBuilder(); - - for(int i=0; i < childs.length; i++) { - sb.append(childs[i]); - if(i < childs.length - 1) - sb.append("/"); - } - - return new Path(parent, sb.toString()); - } - - public static KeyValueSet newPhysicalProperties(CatalogProtos.StoreType type) { - KeyValueSet options = new KeyValueSet(); - if (CatalogProtos.StoreType.CSV == type) { - options.set(CSVFILE_DELIMITER, DEFAULT_FIELD_DELIMITER); - } else if (CatalogProtos.StoreType.RCFILE == type) { - options.set(RCFILE_SERDE, DEFAULT_BINARY_SERDE); - } else if (CatalogProtos.StoreType.SEQUENCEFILE == type) { - options.set(SEQUENCEFILE_SERDE, DEFAULT_TEXT_SERDE); - options.set(SEQUENCEFILE_DELIMITER, DEFAULT_FIELD_DELIMITER); - } else if (type == CatalogProtos.StoreType.PARQUET) { - options.set(ParquetOutputFormat.BLOCK_SIZE, PARQUET_DEFAULT_BLOCK_SIZE); - options.set(ParquetOutputFormat.PAGE_SIZE, PARQUET_DEFAULT_PAGE_SIZE); - options.set(ParquetOutputFormat.COMPRESSION, PARQUET_DEFAULT_COMPRESSION_CODEC_NAME); - options.set(ParquetOutputFormat.ENABLE_DICTIONARY, PARQUET_DEFAULT_IS_DICTIONARY_ENABLED); - options.set(ParquetOutputFormat.VALIDATION, PARQUET_DEFAULT_IS_VALIDATION_ENABLED); - } - - return options; - } - - static final String fileNamePatternV08 = "part-[0-9]*-[0-9]*"; - static final String fileNamePatternV09 = "part-[0-9]*-[0-9]*-[0-9]*"; - - /** - * Written files can be one of two forms: "part-[0-9]*-[0-9]*" or "part-[0-9]*-[0-9]*-[0-9]*". - * - * This method finds the maximum sequence number from existing data files through the above patterns. - * If it cannot find any matched file or the maximum number, it will return -1. - * - * @param fs - * @param path - * @param recursive - * @return The maximum sequence number - * @throws java.io.IOException - */ - public static int getMaxFileSequence(FileSystem fs, Path path, boolean recursive) throws IOException { - if (!fs.isDirectory(path)) { - return -1; - } - - FileStatus[] files = fs.listStatus(path); - - if (files == null || files.length == 0) { - return -1; - } - - int maxValue = -1; - List<Path> fileNamePatternMatchedList = new ArrayList<Path>(); - - for (FileStatus eachFile: files) { - // In the case of partition table, return largest value within all partition dirs. - if (eachFile.isDirectory() && recursive) { - int value = getMaxFileSequence(fs, eachFile.getPath(), recursive); - if (value > maxValue) { - maxValue = value; - } - } else { - if (eachFile.getPath().getName().matches(fileNamePatternV08) || - eachFile.getPath().getName().matches(fileNamePatternV09)) { - fileNamePatternMatchedList.add(eachFile.getPath()); - } - } - } - - if (fileNamePatternMatchedList.isEmpty()) { - return maxValue; - } - Path lastFile = fileNamePatternMatchedList.get(fileNamePatternMatchedList.size() - 1); - String pathName = lastFile.getName(); - - // 0.8: pathName = part-<ExecutionBlockId.seq>-<QueryUnitId.seq> - // 0.9: pathName = part-<ExecutionBlockId.seq>-<QueryUnitId.seq>-<Sequence> - String[] pathTokens = pathName.split("-"); - if (pathTokens.length == 3) { - return -1; - } else if(pathTokens.length == 4) { - return Integer.parseInt(pathTokens[3]); - } else { - return -1; - } - } - - public static void closeBuffer(ByteBuffer buffer) { - if (buffer != null) { - if (buffer.isDirect()) { - ((DirectBuffer) buffer).cleaner().clean(); - } else { - buffer.clear(); - } - } - } - - public static int readFully(InputStream is, byte[] buffer, int offset, int length) - throws IOException { - int nread = 0; - while (nread < length) { - int nbytes = is.read(buffer, offset + nread, length - nread); - if (nbytes < 0) { - return nread > 0 ? nread : nbytes; - } - nread += nbytes; - } - return nread; - } - - /** - * Similar to readFully(). Skips bytes in a loop. - * @param in The DataInput to skip bytes from - * @param len number of bytes to skip. - * @throws java.io.IOException if it could not skip requested number of bytes - * for any reason (including EOF) - */ - public static void skipFully(DataInput in, int len) throws IOException { - int amt = len; - while (amt > 0) { - long ret = in.skipBytes(amt); - if (ret == 0) { - // skip may return 0 even if we're not at EOF. Luckily, we can - // use the read() method to figure out if we're at the end. - int b = in.readByte(); - if (b == -1) { - throw new EOFException( "Premature EOF from inputStream after " + - "skipping " + (len - amt) + " byte(s)."); - } - ret = 1; - } - amt -= ret; - } - } - - public static boolean isFileStorageType(StoreType storageType) { - return true; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/TableStatistics.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/TableStatistics.java b/tajo-storage/src/main/java/org/apache/tajo/storage/TableStatistics.java deleted file mode 100644 index a2c08de..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/TableStatistics.java +++ /dev/null @@ -1,129 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.statistics.ColumnStats; -import org.apache.tajo.catalog.statistics.TableStats; -import org.apache.tajo.common.TajoDataTypes.DataType; -import org.apache.tajo.common.TajoDataTypes.Type; -import org.apache.tajo.datum.Datum; -import org.apache.tajo.datum.NullDatum; - -/** - * This class is not thread-safe. - */ -public class TableStatistics { - private static final Log LOG = LogFactory.getLog(TableStatistics.class); - private Schema schema; - private Tuple minValues; - private Tuple maxValues; - private long [] numNulls; - private long numRows = 0; - private long numBytes = 0; - - private boolean [] comparable; - - public TableStatistics(Schema schema) { - this.schema = schema; - minValues = new VTuple(schema.size()); - maxValues = new VTuple(schema.size()); - - numNulls = new long[schema.size()]; - comparable = new boolean[schema.size()]; - - DataType type; - for (int i = 0; i < schema.size(); i++) { - type = schema.getColumn(i).getDataType(); - if (type.getType() == Type.PROTOBUF) { - comparable[i] = false; - } else { - comparable[i] = true; - } - } - } - - public Schema getSchema() { - return this.schema; - } - - public void incrementRow() { - numRows++; - } - - public long getNumRows() { - return this.numRows; - } - - public void setNumBytes(long bytes) { - this.numBytes = bytes; - } - - public long getNumBytes() { - return this.numBytes; - } - - public void analyzeField(int idx, Datum datum) { - if (datum instanceof NullDatum) { - numNulls[idx]++; - return; - } - - if (comparable[idx]) { - if (!maxValues.contains(idx) || - maxValues.get(idx).compareTo(datum) < 0) { - maxValues.put(idx, datum); - } - if (!minValues.contains(idx) || - minValues.get(idx).compareTo(datum) > 0) { - minValues.put(idx, datum); - } - } - } - - public TableStats getTableStat() { - TableStats stat = new TableStats(); - - ColumnStats columnStats; - for (int i = 0; i < schema.size(); i++) { - columnStats = new ColumnStats(schema.getColumn(i)); - columnStats.setNumNulls(numNulls[i]); - if (minValues.get(i) == null || schema.getColumn(i).getDataType().getType() == minValues.get(i).type()) { - columnStats.setMinValue(minValues.get(i)); - } else { - LOG.warn("Wrong statistics column type (" + minValues.get(i).type() + - ", expected=" + schema.getColumn(i).getDataType().getType() + ")"); - } - if (maxValues.get(i) == null || schema.getColumn(i).getDataType().getType() == maxValues.get(i).type()) { - columnStats.setMaxValue(maxValues.get(i)); - } else { - LOG.warn("Wrong statistics column type (" + maxValues.get(i).type() + - ", expected=" + schema.getColumn(i).getDataType().getType() + ")"); - } - stat.addColumnStat(columnStats); - } - - stat.setNumRows(this.numRows); - stat.setNumBytes(this.numBytes); - - return stat; - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java b/tajo-storage/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java deleted file mode 100644 index 094d285..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/TextSerializerDeserializer.java +++ /dev/null @@ -1,226 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -import com.google.protobuf.Message; -import org.apache.commons.codec.binary.Base64; -import org.apache.tajo.catalog.Column; -import org.apache.tajo.common.TajoDataTypes; -import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.datum.*; -import org.apache.tajo.datum.protobuf.ProtobufJsonFormat; -import org.apache.tajo.util.Bytes; -import org.apache.tajo.util.NumberUtil; - -import java.io.IOException; -import java.io.OutputStream; - -//Compatibility with Apache Hive -@Deprecated -public class TextSerializerDeserializer implements SerializerDeserializer { - public static final byte[] trueBytes = "true".getBytes(); - public static final byte[] falseBytes = "false".getBytes(); - private ProtobufJsonFormat protobufJsonFormat = ProtobufJsonFormat.getInstance(); - - - @Override - public int serialize(Column col, Datum datum, OutputStream out, byte[] nullCharacters) throws IOException { - - byte[] bytes; - int length = 0; - TajoDataTypes.DataType dataType = col.getDataType(); - - if (datum == null || datum instanceof NullDatum) { - switch (dataType.getType()) { - case CHAR: - case TEXT: - length = nullCharacters.length; - out.write(nullCharacters); - break; - default: - break; - } - return length; - } - - switch (dataType.getType()) { - case BOOLEAN: - out.write(datum.asBool() ? trueBytes : falseBytes); - length = trueBytes.length; - break; - case CHAR: - byte[] pad = new byte[dataType.getLength() - datum.size()]; - bytes = datum.asTextBytes(); - out.write(bytes); - out.write(pad); - length = bytes.length + pad.length; - break; - case TEXT: - case BIT: - case INT2: - case INT4: - case INT8: - case FLOAT4: - case FLOAT8: - case INET4: - case DATE: - case INTERVAL: - bytes = datum.asTextBytes(); - length = bytes.length; - out.write(bytes); - break; - case TIME: - bytes = ((TimeDatum)datum).asChars(TajoConf.getCurrentTimeZone(), true).getBytes(); - length = bytes.length; - out.write(bytes); - break; - case TIMESTAMP: - bytes = ((TimestampDatum)datum).asChars(TajoConf.getCurrentTimeZone(), true).getBytes(); - length = bytes.length; - out.write(bytes); - break; - case INET6: - case BLOB: - bytes = Base64.encodeBase64(datum.asByteArray(), false); - length = bytes.length; - out.write(bytes, 0, length); - break; - case PROTOBUF: - ProtobufDatum protobuf = (ProtobufDatum) datum; - byte[] protoBytes = protobufJsonFormat.printToString(protobuf.get()).getBytes(); - length = protoBytes.length; - out.write(protoBytes, 0, protoBytes.length); - break; - case NULL_TYPE: - default: - break; - } - return length; - } - - @Override - public Datum deserialize(Column col, byte[] bytes, int offset, int length, byte[] nullCharacters) throws IOException { - - Datum datum; - switch (col.getDataType().getType()) { - case BOOLEAN: - datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get() - : DatumFactory.createBool(bytes[offset] == 't' || bytes[offset] == 'T'); - break; - case BIT: - datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get() - : DatumFactory.createBit(Byte.parseByte(new String(bytes, offset, length))); - break; - case CHAR: - datum = isNullText(bytes, offset, length, nullCharacters) ? NullDatum.get() - : DatumFactory.createChar(new String(bytes, offset, length).trim()); - break; - case INT1: - case INT2: - datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get() - : DatumFactory.createInt2((short) NumberUtil.parseInt(bytes, offset, length)); - break; - case INT4: - datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get() - : DatumFactory.createInt4(NumberUtil.parseInt(bytes, offset, length)); - break; - case INT8: - datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get() - : DatumFactory.createInt8(new String(bytes, offset, length)); - break; - case FLOAT4: - datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get() - : DatumFactory.createFloat4(new String(bytes, offset, length)); - break; - case FLOAT8: - datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get() - : DatumFactory.createFloat8(NumberUtil.parseDouble(bytes, offset, length)); - break; - case TEXT: { - byte[] chars = new byte[length]; - System.arraycopy(bytes, offset, chars, 0, length); - datum = isNullText(bytes, offset, length, nullCharacters) ? NullDatum.get() - : DatumFactory.createText(chars); - break; - } - case DATE: - datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get() - : DatumFactory.createDate(new String(bytes, offset, length)); - break; - case TIME: - datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get() - : DatumFactory.createTime(new String(bytes, offset, length)); - break; - case TIMESTAMP: - datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get() - : DatumFactory.createTimestamp(new String(bytes, offset, length)); - break; - case INTERVAL: - datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get() - : DatumFactory.createInterval(new String(bytes, offset, length)); - break; - case PROTOBUF: { - if (isNull(bytes, offset, length, nullCharacters)) { - datum = NullDatum.get(); - } else { - ProtobufDatumFactory factory = ProtobufDatumFactory.get(col.getDataType()); - Message.Builder builder = factory.newBuilder(); - try { - byte[] protoBytes = new byte[length]; - System.arraycopy(bytes, offset, protoBytes, 0, length); - protobufJsonFormat.merge(protoBytes, builder); - datum = factory.createDatum(builder.build()); - } catch (IOException e) { - e.printStackTrace(); - throw new RuntimeException(e); - } - } - break; - } - case INET4: - datum = isNull(bytes, offset, length, nullCharacters) ? NullDatum.get() - : DatumFactory.createInet4(new String(bytes, offset, length)); - break; - case BLOB: { - if (isNull(bytes, offset, length, nullCharacters)) { - datum = NullDatum.get(); - } else { - byte[] blob = new byte[length]; - System.arraycopy(bytes, offset, blob, 0, length); - datum = DatumFactory.createBlob(Base64.decodeBase64(blob)); - } - break; - } - default: - datum = NullDatum.get(); - break; - } - return datum; - } - - private static boolean isNull(byte[] val, int offset, int length, byte[] nullBytes) { - return length == 0 || ((length == nullBytes.length) - && Bytes.equals(val, offset, length, nullBytes, 0, nullBytes.length)); - } - - private static boolean isNullText(byte[] val, int offset, int length, byte[] nullBytes) { - return length > 0 && length == nullBytes.length - && Bytes.equals(val, offset, length, nullBytes, 0, nullBytes.length); - } -} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/TupleComparator.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/TupleComparator.java b/tajo-storage/src/main/java/org/apache/tajo/storage/TupleComparator.java deleted file mode 100644 index 8dffd8d..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/TupleComparator.java +++ /dev/null @@ -1,32 +0,0 @@ -/*** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -import org.apache.tajo.common.ProtoObject; - -import java.util.Comparator; - -import static org.apache.tajo.index.IndexProtos.TupleComparatorProto; - -public abstract class TupleComparator implements Comparator<Tuple>, ProtoObject<TupleComparatorProto> { - - public abstract int compare(Tuple o1, Tuple o2); - - public abstract boolean isAscendingFirstKey(); -} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/TupleRange.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/TupleRange.java b/tajo-storage/src/main/java/org/apache/tajo/storage/TupleRange.java deleted file mode 100644 index e824b99..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/TupleRange.java +++ /dev/null @@ -1,112 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage; - -import com.google.common.base.Objects; -import org.apache.tajo.catalog.Schema; -import org.apache.tajo.catalog.SortSpec; - -import java.util.Comparator; - -/** - * It represents a pair of start and end tuples. - */ -public class TupleRange implements Comparable<TupleRange>, Cloneable { - private Tuple start; - private Tuple end; - private final TupleComparator comp; - - public TupleRange(final SortSpec[] sortSpecs, final Tuple start, final Tuple end) { - this.comp = new BaseTupleComparator(sortSpecsToSchema(sortSpecs), sortSpecs); - // if there is only one value, start == end - this.start = start; - this.end = end; - } - - public static Schema sortSpecsToSchema(SortSpec[] sortSpecs) { - Schema schema = new Schema(); - for (SortSpec spec : sortSpecs) { - schema.addColumn(spec.getSortKey()); - } - - return schema; - } - - public void setStart(Tuple tuple) { - this.start = tuple; - } - - public final Tuple getStart() { - return this.start; - } - - public void setEnd(Tuple tuple) { - this.end = tuple; - } - - public final Tuple getEnd() { - return this.end; - } - - public String toString() { - return "[" + this.start + ", " + this.end + ")"; - } - - @Override - public int hashCode() { - return Objects.hashCode(start, end); - } - - @Override - public boolean equals(Object obj) { - if (obj instanceof TupleRange) { - TupleRange other = (TupleRange) obj; - return this.start.equals(other.start) && this.end.equals(other.end); - } else { - return false; - } - } - - @Override - public int compareTo(TupleRange o) { - // TODO - should handle overlap - int cmpVal = comp.compare(this.start, o.start); - if (cmpVal != 0) { - return cmpVal; - } else { - return comp.compare(this.end, o.end); - } - } - - public static class DescendingTupleRangeComparator - implements Comparator<TupleRange> { - - @Override - public int compare(TupleRange left, TupleRange right) { - return right.compareTo(left); - } - } - - public TupleRange clone() throws CloneNotSupportedException { - TupleRange newRange = (TupleRange) super.clone(); - newRange.setStart(start.clone()); - newRange.setEnd(end.clone()); - return newRange; - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/annotation/ForSplitableStore.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/annotation/ForSplitableStore.java b/tajo-storage/src/main/java/org/apache/tajo/storage/annotation/ForSplitableStore.java deleted file mode 100644 index ad19101..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/annotation/ForSplitableStore.java +++ /dev/null @@ -1,29 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.annotation; - -import java.lang.annotation.ElementType; -import java.lang.annotation.Retention; -import java.lang.annotation.RetentionPolicy; -import java.lang.annotation.Target; - -@Retention(RetentionPolicy.RUNTIME) -@Target(ElementType.TYPE) -public @interface ForSplitableStore { -} http://git-wip-us.apache.org/repos/asf/tajo/blob/dfd7f996/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java ---------------------------------------------------------------------- diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java b/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java deleted file mode 100644 index 9e1e7ea..0000000 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/avro/AvroAppender.java +++ /dev/null @@ -1,221 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.storage.avro; - -import java.io.FileNotFoundException; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.List; - -import org.apache.avro.Schema; -import org.apache.avro.file.DataFileWriter; -import org.apache.avro.generic.GenericDatumWriter; -import org.apache.avro.generic.GenericData; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.io.DatumWriter; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.Path; -import org.apache.tajo.QueryUnitAttemptId; -import org.apache.tajo.catalog.Column; -import org.apache.tajo.catalog.statistics.TableStats; -import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.datum.NullDatum; -import org.apache.tajo.storage.FileAppender; -import org.apache.tajo.storage.TableStatistics; -import org.apache.tajo.storage.Tuple; - -import java.io.IOException; - -/** - * FileAppender for writing to Avro files. - */ -public class AvroAppender extends FileAppender { - private TableStatistics stats; - private Schema avroSchema; - private List<Schema.Field> avroFields; - private DataFileWriter<GenericRecord> dataFileWriter; - - /** - * Creates a new AvroAppender. - * - * @param conf Configuration properties. - * @param schema The table schema. - * @param meta The table metadata. - * @param workDir The path of the Parquet file to write to. - */ - public AvroAppender(Configuration conf, - QueryUnitAttemptId taskAttemptId, - org.apache.tajo.catalog.Schema schema, - TableMeta meta, Path workDir) throws IOException { - super(conf, taskAttemptId, schema, meta, workDir); - } - - /** - * Initializes the Appender. - */ - public void init() throws IOException { - FileSystem fs = path.getFileSystem(conf); - if (!fs.exists(path.getParent())) { - throw new FileNotFoundException(path.toString()); - } - FSDataOutputStream outputStream = fs.create(path); - - avroSchema = AvroUtil.getAvroSchema(meta, conf); - avroFields = avroSchema.getFields(); - - DatumWriter<GenericRecord> datumWriter = - new GenericDatumWriter<GenericRecord>(avroSchema); - dataFileWriter = new DataFileWriter<GenericRecord>(datumWriter); - dataFileWriter.create(avroSchema, outputStream); - - if (enabledStats) { - this.stats = new TableStatistics(schema); - } - super.init(); - } - - /** - * Gets the current offset. Tracking offsets is currenly not implemented, so - * this method always returns 0. - * - * @return 0 - */ - @Override - public long getOffset() throws IOException { - return 0; - } - - private Object getPrimitive(Tuple tuple, int i, Schema.Type avroType) { - if (tuple.get(i) instanceof NullDatum) { - return null; - } - switch (avroType) { - case NULL: - return null; - case BOOLEAN: - return tuple.getBool(i); - case INT: - return tuple.getInt4(i); - case LONG: - return tuple.getInt8(i); - case FLOAT: - return tuple.getFloat4(i); - case DOUBLE: - return tuple.getFloat8(i); - case BYTES: - case FIXED: - return ByteBuffer.wrap(tuple.getBytes(i)); - case STRING: - return tuple.getText(i); - default: - throw new RuntimeException("Unknown primitive type."); - } - } - - /** - * Write a Tuple to the Avro file. - * - * @param tuple The Tuple to write. - */ - @Override - public void addTuple(Tuple tuple) throws IOException { - GenericRecord record = new GenericData.Record(avroSchema); - for (int i = 0; i < schema.size(); ++i) { - Column column = schema.getColumn(i); - if (enabledStats) { - stats.analyzeField(i, tuple.get(i)); - } - Object value; - Schema.Field avroField = avroFields.get(i); - Schema.Type avroType = avroField.schema().getType(); - switch (avroType) { - case NULL: - case BOOLEAN: - case INT: - case LONG: - case FLOAT: - case DOUBLE: - case BYTES: - case STRING: - case FIXED: - value = getPrimitive(tuple, i, avroType); - break; - case RECORD: - throw new RuntimeException("Avro RECORD not supported."); - case ENUM: - throw new RuntimeException("Avro ENUM not supported."); - case MAP: - throw new RuntimeException("Avro MAP not supported."); - case UNION: - List<Schema> schemas = avroField.schema().getTypes(); - if (schemas.size() != 2) { - throw new RuntimeException("Avro UNION not supported."); - } - if (schemas.get(0).getType().equals(Schema.Type.NULL)) { - value = getPrimitive(tuple, i, schemas.get(1).getType()); - } else if (schemas.get(1).getType().equals(Schema.Type.NULL)) { - value = getPrimitive(tuple, i, schemas.get(0).getType()); - } else { - throw new RuntimeException("Avro UNION not supported."); - } - break; - default: - throw new RuntimeException("Unknown type: " + avroType); - } - record.put(i, value); - } - dataFileWriter.append(record); - - if (enabledStats) { - stats.incrementRow(); - } - } - - /** - * Flushes the current state of the file. - */ - @Override - public void flush() throws IOException { - dataFileWriter.flush(); - } - - /** - * Closes the Appender. - */ - @Override - public void close() throws IOException { - dataFileWriter.close(); - } - - /** - * If table statistics is enabled, retrieve the table statistics. - * - * @return Table statistics if enabled or null otherwise. - */ - @Override - public TableStats getStats() { - if (enabledStats) { - return stats.getTableStat(); - } else { - return null; - } - } -}
