Repository: tajo Updated Branches: refs/heads/master 54bf51678 -> ec1337ff2
TAJO-1928: Can't read parquet on hive meta. Closes #826 Project: http://git-wip-us.apache.org/repos/asf/tajo/repo Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/ec1337ff Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/ec1337ff Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/ec1337ff Branch: refs/heads/master Commit: ec1337ff22c1aaff51918f786ff559a2cd8b6f29 Parents: 54bf516 Author: Jinho Kim <[email protected]> Authored: Fri Oct 16 15:40:30 2015 +0900 Committer: Jinho Kim <[email protected]> Committed: Fri Oct 16 15:40:30 2015 +0900 ---------------------------------------------------------------------- CHANGES | 1 + .../tajo/catalog/store/HiveCatalogStore.java | 132 +++++++++++-------- .../tajo/catalog/store/HiveCatalogUtil.java | 65 +++++---- .../catalog/store/TestHiveCatalogStore.java | 62 +++++++-- .../apache/tajo/storage/StorageConstants.java | 2 +- tajo-dist/src/main/conf/tajo-env.cmd | 2 +- .../apache/tajo/storage/orc/ORCAppender.java | 2 +- 7 files changed, 176 insertions(+), 90 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/tajo/blob/ec1337ff/CHANGES ---------------------------------------------------------------------- diff --git a/CHANGES b/CHANGES index ab2e45f..974e6e1 100644 --- a/CHANGES +++ b/CHANGES @@ -338,6 +338,7 @@ Release 0.11.0 - unreleased BUG FIXES + TAJO-1928: Can't read parquet on hive meta. (jinho) TAJO-1923: Selecting on information_schema.table_stats throws an internal error. (jihoon) http://git-wip-us.apache.org/repos/asf/tajo/blob/ec1337ff/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java b/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java index f4e21dd..6355a5f 100644 --- a/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java +++ b/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java @@ -28,11 +28,16 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.*; +import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat; +import org.apache.hadoop.hive.ql.io.IOConstants; +import org.apache.hadoop.hive.ql.io.StorageFormatDescriptor; +import org.apache.hadoop.hive.ql.io.StorageFormatFactory; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe; import org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe; import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; import org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe; +import org.apache.hadoop.mapred.TextInputFormat; import org.apache.tajo.BuiltinStorages; import org.apache.tajo.TajoConstants; import org.apache.tajo.algebra.Expr; @@ -52,6 +57,7 @@ import org.apache.tajo.storage.StorageConstants; import org.apache.tajo.util.KeyValueSet; import org.apache.tajo.util.TUtil; import org.apache.thrift.TException; +import parquet.hadoop.ParquetOutputFormat; import java.io.File; import java.io.IOException; @@ -61,9 +67,10 @@ public class HiveCatalogStore extends CatalogConstants implements CatalogStore { protected final Log LOG = LogFactory.getLog(getClass()); private static String HIVE_WAREHOUSE_DIR_CONF_KEY = "hive.metastore.warehouse.dir"; + private static final int CLIENT_POOL_SIZE = 2; + private static final StorageFormatFactory storageFormatFactory = new StorageFormatFactory(); protected Configuration conf; - private static final int CLIENT_POOL_SIZE = 2; private final HiveCatalogStoreClientPool clientPool; private final String defaultTableSpaceUri; private final String catalogUri; @@ -104,12 +111,27 @@ public class HiveCatalogStore extends CatalogConstants implements CatalogStore { return exist; } + protected org.apache.hadoop.hive.ql.metadata.Table getHiveTable(String databaseName, final String tableName) + throws UndefinedTableException { + + HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null; + try { + client = clientPool.getClient(); + return HiveCatalogUtil.getTable(client.getHiveClient(), databaseName, tableName); + } catch (NoSuchObjectException nsoe) { + throw new UndefinedTableException(tableName); + } catch (Exception e) { + throw new TajoInternalError(e); + } finally { + if (client != null) client.release(); + } + } + @Override public final CatalogProtos.TableDescProto getTable(String databaseName, final String tableName) throws UndefinedTableException { org.apache.hadoop.hive.ql.metadata.Table table = null; - HiveCatalogStoreClientPool.HiveCatalogStoreClient client = null; Path path = null; String dataFormat = null; org.apache.tajo.catalog.Schema schema = null; @@ -122,21 +144,14 @@ public class HiveCatalogStore extends CatalogConstants implements CatalogStore { ////////////////////////////////// try { // get hive table schema - try { - client = clientPool.getClient(); - table = HiveCatalogUtil.getTable(client.getHiveClient(), databaseName, tableName); - path = table.getPath(); - } catch (NoSuchObjectException nsoe) { - throw new UndefinedTableException(tableName); - } catch (Exception e) { - throw new TajoInternalError(e); - } + table = getHiveTable(databaseName, tableName); + path = table.getPath(); // convert HiveCatalogStore field schema into tajo field schema. schema = new org.apache.tajo.catalog.Schema(); List<FieldSchema> fieldSchemaList = table.getCols(); - boolean isPartitionKey = false; + boolean isPartitionKey; for (FieldSchema eachField : fieldSchemaList) { isPartitionKey = false; @@ -184,14 +199,13 @@ public class HiveCatalogStore extends CatalogConstants implements CatalogStore { } options.remove(serdeConstants.SERIALIZATION_NULL_FORMAT); - // set file output format - String fileOutputformat = properties.getProperty(hive_metastoreConstants.FILE_OUTPUT_FORMAT); - dataFormat = HiveCatalogUtil.getDataFormat(fileOutputformat); - if (dataFormat.equalsIgnoreCase("TEXT")) { + dataFormat = HiveCatalogUtil.getDataFormat(table.getSd()); + if (BuiltinStorages.TEXT.equals(dataFormat)) { options.set(StorageConstants.TEXT_DELIMITER, StringEscapeUtils.escapeJava(fieldDelimiter)); options.set(StorageConstants.TEXT_NULL, StringEscapeUtils.escapeJava(nullFormat)); - } else if (dataFormat.equals("RCFILE")) { + + } else if (BuiltinStorages.RCFILE.equals(dataFormat)) { options.set(StorageConstants.RCFILE_NULL, StringEscapeUtils.escapeJava(nullFormat)); String serde = properties.getProperty(serdeConstants.SERIALIZATION_LIB); if (LazyBinaryColumnarSerDe.class.getName().equals(serde)) { @@ -199,7 +213,8 @@ public class HiveCatalogStore extends CatalogConstants implements CatalogStore { } else if (ColumnarSerDe.class.getName().equals(serde)) { options.set(StorageConstants.RCFILE_SERDE, StorageConstants.DEFAULT_TEXT_SERDE); } - } else if (dataFormat.equals("SEQUENCEFILE")) { + + } else if (BuiltinStorages.SEQUENCE_FILE.equals(dataFormat)) { options.set(StorageConstants.SEQUENCEFILE_DELIMITER, StringEscapeUtils.escapeJava(fieldDelimiter)); options.set(StorageConstants.SEQUENCEFILE_NULL, StringEscapeUtils.escapeJava(nullFormat)); String serde = properties.getProperty(serdeConstants.SERIALIZATION_LIB); @@ -208,6 +223,7 @@ public class HiveCatalogStore extends CatalogConstants implements CatalogStore { } else if (LazySimpleSerDe.class.getName().equals(serde)) { options.set(StorageConstants.SEQUENCEFILE_SERDE, StorageConstants.DEFAULT_TEXT_SERDE); } + } // set data size @@ -255,9 +271,8 @@ public class HiveCatalogStore extends CatalogConstants implements CatalogStore { } } catch (Throwable t) { throw new TajoInternalError(t); - } finally { - if(client != null) client.release(); } + TableMeta meta = new TableMeta(dataFormat, options); TableDesc tableDesc = new TableDesc(databaseName + "." + tableName, schema, meta, path.toUri()); if (table.getTableType().equals(TableType.EXTERNAL_TABLE)) { @@ -341,7 +356,7 @@ public class HiveCatalogStore extends CatalogConstants implements CatalogStore { @Override public void alterTablespace(CatalogProtos.AlterTablespaceProto alterProto) { - throw new TajoRuntimeException(new UnsupportedException("Tablespace in HiveMeta")); + // SKIP } @Override @@ -442,21 +457,18 @@ public class HiveCatalogStore extends CatalogConstants implements CatalogStore { sd.getSerdeInfo().setParameters(new HashMap<>()); sd.getSerdeInfo().setName(table.getTableName()); - // if tajo set location method, thrift client make exception as follows: - // Caused by: MetaException(message:java.lang.NullPointerException) - // If you want to modify table path, you have to modify on Hive cli. - if (tableDesc.isExternal()) { - table.setTableType(TableType.EXTERNAL_TABLE.name()); - table.putToParameters("EXTERNAL", "TRUE"); - - Path tablePath = new Path(tableDesc.getUri()); - FileSystem fs = tablePath.getFileSystem(conf); - if (fs.isFile(tablePath)) { - LOG.warn("A table path is a file, but HiveCatalogStore does not allow a file path."); - sd.setLocation(tablePath.getParent().toString()); - } else { - sd.setLocation(tablePath.toString()); - } + //If tableType is a managed-table, the location is hive-warehouse dir + // and it will be wrong path in output committing + table.setTableType(TableType.EXTERNAL_TABLE.name()); + table.putToParameters("EXTERNAL", "TRUE"); + + Path tablePath = new Path(tableDesc.getUri()); + FileSystem fs = tablePath.getFileSystem(conf); + if (fs.isFile(tablePath)) { + LOG.warn("A table path is a file, but HiveCatalogStore does not allow a file path."); + sd.setLocation(tablePath.getParent().toString()); + } else { + sd.setLocation(tablePath.toString()); } // set column information @@ -480,14 +492,15 @@ public class HiveCatalogStore extends CatalogConstants implements CatalogStore { } if (tableDesc.getMeta().getDataFormat().equalsIgnoreCase(BuiltinStorages.RCFILE)) { + StorageFormatDescriptor descriptor = storageFormatFactory.get(IOConstants.RCFILE); + sd.setInputFormat(descriptor.getInputFormat()); + sd.setOutputFormat(descriptor.getOutputFormat()); + String serde = tableDesc.getMeta().getOption(StorageConstants.RCFILE_SERDE); - sd.setInputFormat(org.apache.hadoop.hive.ql.io.RCFileInputFormat.class.getName()); - sd.setOutputFormat(org.apache.hadoop.hive.ql.io.RCFileOutputFormat.class.getName()); if (StorageConstants.DEFAULT_TEXT_SERDE.equals(serde)) { - sd.getSerdeInfo().setSerializationLib(org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe.class.getName()); + sd.getSerdeInfo().setSerializationLib(ColumnarSerDe.class.getName()); } else { - sd.getSerdeInfo().setSerializationLib( - org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe.class.getName()); + sd.getSerdeInfo().setSerializationLib(LazyBinaryColumnarSerDe.class.getName()); } if (tableDesc.getMeta().getOptions().containsKey(StorageConstants.RCFILE_NULL)) { @@ -495,9 +508,10 @@ public class HiveCatalogStore extends CatalogConstants implements CatalogStore { StringEscapeUtils.unescapeJava(tableDesc.getMeta().getOption(StorageConstants.RCFILE_NULL))); } } else if (tableDesc.getMeta().getDataFormat().equals(BuiltinStorages.TEXT)) { - sd.getSerdeInfo().setSerializationLib(org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.class.getName()); - sd.setInputFormat(org.apache.hadoop.mapred.TextInputFormat.class.getName()); - sd.setOutputFormat(org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat.class.getName()); + // TextFileStorageFormatDescriptor has deprecated class. so the class name set directly + sd.setInputFormat(TextInputFormat.class.getName()); + sd.setOutputFormat(HiveIgnoreKeyTextOutputFormat.class.getName()); + sd.getSerdeInfo().setSerializationLib(LazySimpleSerDe.class.getName()); String fieldDelimiter = tableDesc.getMeta().getOption(StorageConstants.TEXT_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER); @@ -519,12 +533,14 @@ public class HiveCatalogStore extends CatalogConstants implements CatalogStore { table.getParameters().remove(StorageConstants.TEXT_NULL); } } else if (tableDesc.getMeta().getDataFormat().equalsIgnoreCase(BuiltinStorages.SEQUENCE_FILE)) { + StorageFormatDescriptor descriptor = storageFormatFactory.get(IOConstants.SEQUENCEFILE); + sd.setInputFormat(descriptor.getInputFormat()); + sd.setOutputFormat(descriptor.getOutputFormat()); + String serde = tableDesc.getMeta().getOption(StorageConstants.SEQUENCEFILE_SERDE); - sd.setInputFormat(org.apache.hadoop.mapred.SequenceFileInputFormat.class.getName()); - sd.setOutputFormat(org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat.class.getName()); if (StorageConstants.DEFAULT_TEXT_SERDE.equals(serde)) { - sd.getSerdeInfo().setSerializationLib(org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.class.getName()); + sd.getSerdeInfo().setSerializationLib(LazySimpleSerDe.class.getName()); String fieldDelimiter = tableDesc.getMeta().getOption(StorageConstants.SEQUENCEFILE_DELIMITER, StorageConstants.DEFAULT_FIELD_DELIMITER); @@ -540,7 +556,7 @@ public class HiveCatalogStore extends CatalogConstants implements CatalogStore { StringEscapeUtils.unescapeJava(fieldDelimiter)); table.getParameters().remove(StorageConstants.SEQUENCEFILE_DELIMITER); } else { - sd.getSerdeInfo().setSerializationLib(org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe.class.getName()); + sd.getSerdeInfo().setSerializationLib(LazyBinarySerDe.class.getName()); } if (tableDesc.getMeta().containsOption(StorageConstants.SEQUENCEFILE_NULL)) { @@ -548,14 +564,18 @@ public class HiveCatalogStore extends CatalogConstants implements CatalogStore { StringEscapeUtils.unescapeJava(tableDesc.getMeta().getOption(StorageConstants.SEQUENCEFILE_NULL))); table.getParameters().remove(StorageConstants.SEQUENCEFILE_NULL); } - } else { - if (tableDesc.getMeta().getDataFormat().equalsIgnoreCase(BuiltinStorages.PARQUET)) { - sd.setInputFormat(parquet.hive.DeprecatedParquetInputFormat.class.getName()); - sd.setOutputFormat(parquet.hive.DeprecatedParquetOutputFormat.class.getName()); - sd.getSerdeInfo().setSerializationLib(parquet.hive.serde.ParquetHiveSerDe.class.getName()); - } else { - throw new UnsupportedException(tableDesc.getMeta().getDataFormat() + " in HivecatalogStore"); + } else if (tableDesc.getMeta().getDataFormat().equalsIgnoreCase(BuiltinStorages.PARQUET)) { + StorageFormatDescriptor descriptor = storageFormatFactory.get(IOConstants.PARQUET); + sd.setInputFormat(descriptor.getInputFormat()); + sd.setOutputFormat(descriptor.getOutputFormat()); + sd.getSerdeInfo().setSerializationLib(descriptor.getSerde()); + + if (tableDesc.getMeta().containsOption(ParquetOutputFormat.COMPRESSION)) { + table.putToParameters(ParquetOutputFormat.COMPRESSION, + tableDesc.getMeta().getOption(ParquetOutputFormat.COMPRESSION)); } + } else { + throw new UnsupportedException(tableDesc.getMeta().getDataFormat() + " in HivecatalogStore"); } sd.setSortCols(new ArrayList<>()); @@ -1293,6 +1313,6 @@ public class HiveCatalogStore extends CatalogConstants implements CatalogStore { @Override public List<TablespaceProto> getTablespaces() { - throw new UnsupportedOperationException(); + return Lists.newArrayList(getTablespace(TajoConstants.DEFAULT_TABLESPACE_NAME)); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/ec1337ff/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogUtil.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogUtil.java b/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogUtil.java index 9e1da2b..bbb7ade 100644 --- a/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogUtil.java +++ b/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogUtil.java @@ -20,17 +20,25 @@ package org.apache.tajo.catalog.store; import com.google.common.base.Preconditions; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat; -import org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat; -import org.apache.hadoop.hive.ql.io.RCFileOutputFormat; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.ql.io.RCFileInputFormat; +import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.avro.AvroSerDe; +import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe; +import org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe; +import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; +import org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe; +import org.apache.hadoop.mapred.SequenceFileInputFormat; +import org.apache.hadoop.mapred.TextInputFormat; import org.apache.tajo.BuiltinStorages; -import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.common.TajoDataTypes; -import org.apache.tajo.exception.*; +import org.apache.tajo.exception.LMDNoMatchedDatatypeException; +import org.apache.tajo.exception.TajoRuntimeException; +import org.apache.tajo.exception.UnknownDataFormatException; +import org.apache.tajo.exception.UnsupportedException; import org.apache.thrift.TException; -import parquet.hadoop.mapred.DeprecatedParquetOutputFormat; public class HiveCatalogUtil { public static void validateSchema(Table tblSchema) { @@ -99,25 +107,38 @@ public class HiveCatalogUtil { } } - public static String getDataFormat(String fileFormat) { - Preconditions.checkNotNull(fileFormat); + public static String getDataFormat(StorageDescriptor descriptor) { + Preconditions.checkNotNull(descriptor); - String[] fileFormatArrary = fileFormat.split("\\."); - if(fileFormatArrary.length < 1) { - throw new TajoRuntimeException(new UnknownDataFormatException(fileFormat)); - } + String serde = descriptor.getSerdeInfo().getSerializationLib(); + String inputFormat = descriptor.getInputFormat(); - String outputFormatClass = fileFormatArrary[fileFormatArrary.length-1]; - if(outputFormatClass.equals(HiveIgnoreKeyTextOutputFormat.class.getSimpleName())) { - return BuiltinStorages.TEXT; - } else if(outputFormatClass.equals(HiveSequenceFileOutputFormat.class.getSimpleName())) { - return CatalogProtos.DataFormat.SEQUENCEFILE.name(); - } else if(outputFormatClass.equals(RCFileOutputFormat.class.getSimpleName())) { - return CatalogProtos.DataFormat.RCFILE.name(); - } else if(outputFormatClass.equals(DeprecatedParquetOutputFormat.class.getSimpleName())) { - return CatalogProtos.DataFormat.PARQUET.name(); + if (LazySimpleSerDe.class.getName().equals(serde)) { + if (TextInputFormat.class.getName().equals(inputFormat)) { + return BuiltinStorages.TEXT; + } else if (SequenceFileInputFormat.class.getName().equals(inputFormat)) { + return BuiltinStorages.SEQUENCE_FILE; + } else { + throw new TajoRuntimeException(new UnknownDataFormatException(inputFormat)); + } + } else if (LazyBinarySerDe.class.getName().equals(serde)) { + if (SequenceFileInputFormat.class.getName().equals(inputFormat)) { + return BuiltinStorages.SEQUENCE_FILE; + } else { + throw new TajoRuntimeException(new UnknownDataFormatException(inputFormat)); + } + } else if (LazyBinaryColumnarSerDe.class.getName().equals(serde) || ColumnarSerDe.class.getName().equals(serde)) { + if (RCFileInputFormat.class.getName().equals(inputFormat)) { + return BuiltinStorages.RCFILE; + } else { + throw new TajoRuntimeException(new UnknownDataFormatException(inputFormat)); + } + } else if (ParquetHiveSerDe.class.getName().equals(serde)) { + return BuiltinStorages.PARQUET; + } else if (AvroSerDe.class.getName().equals(serde)) { + return BuiltinStorages.AVRO; } else { - throw new TajoRuntimeException(new UnknownDataFormatException(fileFormat)); + throw new TajoRuntimeException(new UnknownDataFormatException(inputFormat)); } } http://git-wip-us.apache.org/repos/asf/tajo/blob/ec1337ff/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/test/java/org/apache/tajo/catalog/store/TestHiveCatalogStore.java ---------------------------------------------------------------------- diff --git a/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/test/java/org/apache/tajo/catalog/store/TestHiveCatalogStore.java b/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/test/java/org/apache/tajo/catalog/store/TestHiveCatalogStore.java index 43de047..7608a7b 100644 --- a/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/test/java/org/apache/tajo/catalog/store/TestHiveCatalogStore.java +++ b/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/test/java/org/apache/tajo/catalog/store/TestHiveCatalogStore.java @@ -24,6 +24,11 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat; +import org.apache.hadoop.hive.ql.io.IOConstants; +import org.apache.hadoop.hive.ql.io.StorageFormatDescriptor; +import org.apache.hadoop.hive.ql.io.StorageFormatFactory; +import org.apache.tajo.BuiltinStorages; import org.apache.tajo.catalog.*; import org.apache.tajo.catalog.partition.PartitionDesc; import org.apache.tajo.catalog.partition.PartitionMethodDesc; @@ -60,9 +65,11 @@ public class TestHiveCatalogStore { private static HiveCatalogStore store; private static Path warehousePath; + private static StorageFormatFactory formatFactory; @BeforeClass public static void setUp() throws Exception { + formatFactory = new StorageFormatFactory(); Path testPath = CommonTestingUtil.getTestDir(); warehousePath = new Path(testPath, "warehouse"); @@ -86,7 +93,7 @@ public class TestHiveCatalogStore { @Test public void testTableUsingTextFile() throws Exception { - TableMeta meta = new TableMeta("TEXT", new KeyValueSet()); + TableMeta meta = new TableMeta(BuiltinStorages.TEXT, new KeyValueSet()); org.apache.tajo.catalog.Schema schema = new org.apache.tajo.catalog.Schema(); schema.addColumn("c_custkey", TajoDataTypes.Type.INT4); @@ -103,6 +110,12 @@ public class TestHiveCatalogStore { store.createTable(table.getProto()); assertTrue(store.existTable(DB_NAME, CUSTOMER)); + StorageFormatDescriptor descriptor = formatFactory.get(IOConstants.TEXTFILE); + org.apache.hadoop.hive.ql.metadata.Table hiveTable = store.getHiveTable(DB_NAME, CUSTOMER); + assertEquals(descriptor.getInputFormat(), hiveTable.getSd().getInputFormat()); + //IgnoreKeyTextOutputFormat was deprecated + assertEquals(HiveIgnoreKeyTextOutputFormat.class.getName(), hiveTable.getSd().getOutputFormat()); + TableDesc table1 = new TableDesc(store.getTable(DB_NAME, CUSTOMER)); assertEquals(table.getName(), table1.getName()); assertEquals(table.getUri(), table1.getUri()); @@ -120,7 +133,7 @@ public class TestHiveCatalogStore { public void testTableUsingRCFileWithBinarySerde() throws Exception { KeyValueSet options = new KeyValueSet(); options.set(StorageConstants.RCFILE_SERDE, StorageConstants.DEFAULT_BINARY_SERDE); - TableMeta meta = new TableMeta("RCFILE", options); + TableMeta meta = new TableMeta(BuiltinStorages.RCFILE, options); org.apache.tajo.catalog.Schema schema = new org.apache.tajo.catalog.Schema(); schema.addColumn("r_regionkey", TajoDataTypes.Type.INT4); @@ -132,6 +145,11 @@ public class TestHiveCatalogStore { store.createTable(table.getProto()); assertTrue(store.existTable(DB_NAME, REGION)); + StorageFormatDescriptor descriptor = formatFactory.get(IOConstants.RCFILE); + org.apache.hadoop.hive.ql.metadata.Table hiveTable = store.getHiveTable(DB_NAME, REGION); + assertEquals(descriptor.getInputFormat(), hiveTable.getSd().getInputFormat()); + assertEquals(descriptor.getOutputFormat(), hiveTable.getSd().getOutputFormat()); + TableDesc table1 = new TableDesc(store.getTable(DB_NAME, REGION)); assertEquals(table.getName(), table1.getName()); assertEquals(table.getUri(), table1.getUri()); @@ -149,7 +167,7 @@ public class TestHiveCatalogStore { public void testTableUsingRCFileWithTextSerde() throws Exception { KeyValueSet options = new KeyValueSet(); options.set(StorageConstants.RCFILE_SERDE, StorageConstants.DEFAULT_TEXT_SERDE); - TableMeta meta = new TableMeta("RCFILE", options); + TableMeta meta = new TableMeta(BuiltinStorages.RCFILE, options); org.apache.tajo.catalog.Schema schema = new org.apache.tajo.catalog.Schema(); schema.addColumn("r_regionkey", TajoDataTypes.Type.INT4); @@ -161,6 +179,11 @@ public class TestHiveCatalogStore { store.createTable(table.getProto()); assertTrue(store.existTable(DB_NAME, REGION)); + StorageFormatDescriptor descriptor = formatFactory.get(IOConstants.RCFILE); + org.apache.hadoop.hive.ql.metadata.Table hiveTable = store.getHiveTable(DB_NAME, REGION); + assertEquals(descriptor.getInputFormat(), hiveTable.getSd().getInputFormat()); + assertEquals(descriptor.getOutputFormat(), hiveTable.getSd().getOutputFormat()); + TableDesc table1 = new TableDesc(store.getTable(DB_NAME, REGION)); assertEquals(table.getName(), table1.getName()); assertEquals(table.getUri(), table1.getUri()); @@ -178,7 +201,7 @@ public class TestHiveCatalogStore { KeyValueSet options = new KeyValueSet(); options.set(StorageConstants.TEXT_DELIMITER, StringEscapeUtils.escapeJava("\u0002")); options.set(StorageConstants.TEXT_NULL, StringEscapeUtils.escapeJava("\u0003")); - TableMeta meta = new TableMeta("TEXT", options); + TableMeta meta = new TableMeta(BuiltinStorages.TEXT, options); org.apache.tajo.catalog.Schema schema = new org.apache.tajo.catalog.Schema(); schema.addColumn("s_suppkey", TajoDataTypes.Type.INT4); @@ -195,6 +218,12 @@ public class TestHiveCatalogStore { store.createTable(table.getProto()); assertTrue(store.existTable(DB_NAME, SUPPLIER)); + StorageFormatDescriptor descriptor = formatFactory.get(IOConstants.TEXTFILE); + org.apache.hadoop.hive.ql.metadata.Table hiveTable = store.getHiveTable(DB_NAME, SUPPLIER); + assertEquals(descriptor.getInputFormat(), hiveTable.getSd().getInputFormat()); + //IgnoreKeyTextOutputFormat was deprecated + assertEquals(HiveIgnoreKeyTextOutputFormat.class.getName(), hiveTable.getSd().getOutputFormat()); + TableDesc table1 = new TableDesc(store.getTable(DB_NAME, SUPPLIER)); assertEquals(table.getName(), table1.getName()); assertEquals(table.getUri(), table1.getUri()); @@ -470,7 +499,7 @@ public class TestHiveCatalogStore { @Test public void testGetAllTableNames() throws Exception{ - TableMeta meta = new TableMeta("TEXT", new KeyValueSet()); + TableMeta meta = new TableMeta(BuiltinStorages.TEXT, new KeyValueSet()); org.apache.tajo.catalog.Schema schema = new org.apache.tajo.catalog.Schema(); schema.addColumn("n_name", TajoDataTypes.Type.TEXT); schema.addColumn("n_regionkey", TajoDataTypes.Type.INT4); @@ -498,7 +527,7 @@ public class TestHiveCatalogStore { @Test public void testDeleteTable() throws Exception { - TableMeta meta = new TableMeta("TEXT", new KeyValueSet()); + TableMeta meta = new TableMeta(BuiltinStorages.TEXT, new KeyValueSet()); org.apache.tajo.catalog.Schema schema = new org.apache.tajo.catalog.Schema(); schema.addColumn("n_name", TajoDataTypes.Type.TEXT); schema.addColumn("n_regionkey", TajoDataTypes.Type.INT4); @@ -522,7 +551,7 @@ public class TestHiveCatalogStore { public void testTableUsingSequenceFileWithBinarySerde() throws Exception { KeyValueSet options = new KeyValueSet(); options.set(StorageConstants.SEQUENCEFILE_SERDE, StorageConstants.DEFAULT_BINARY_SERDE); - TableMeta meta = new TableMeta("SEQUENCEFILE", options); + TableMeta meta = new TableMeta(BuiltinStorages.SEQUENCE_FILE, options); org.apache.tajo.catalog.Schema schema = new org.apache.tajo.catalog.Schema(); schema.addColumn("r_regionkey", TajoDataTypes.Type.INT4); @@ -534,6 +563,11 @@ public class TestHiveCatalogStore { store.createTable(table.getProto()); assertTrue(store.existTable(DB_NAME, REGION)); + StorageFormatDescriptor descriptor = formatFactory.get(IOConstants.SEQUENCEFILE); + org.apache.hadoop.hive.ql.metadata.Table hiveTable = store.getHiveTable(DB_NAME, REGION); + assertEquals(descriptor.getInputFormat(), hiveTable.getSd().getInputFormat()); + assertEquals(descriptor.getOutputFormat(), hiveTable.getSd().getOutputFormat()); + TableDesc table1 = new TableDesc(store.getTable(DB_NAME, REGION)); assertEquals(table.getName(), table1.getName()); assertEquals(table.getUri(), table1.getUri()); @@ -551,7 +585,7 @@ public class TestHiveCatalogStore { public void testTableUsingSequenceFileWithTextSerde() throws Exception { KeyValueSet options = new KeyValueSet(); options.set(StorageConstants.SEQUENCEFILE_SERDE, StorageConstants.DEFAULT_TEXT_SERDE); - TableMeta meta = new TableMeta("SEQUENCEFILE", options); + TableMeta meta = new TableMeta(BuiltinStorages.SEQUENCE_FILE, options); org.apache.tajo.catalog.Schema schema = new org.apache.tajo.catalog.Schema(); schema.addColumn("r_regionkey", TajoDataTypes.Type.INT4); @@ -563,6 +597,11 @@ public class TestHiveCatalogStore { store.createTable(table.getProto()); assertTrue(store.existTable(DB_NAME, REGION)); + StorageFormatDescriptor descriptor = formatFactory.get(IOConstants.SEQUENCEFILE); + org.apache.hadoop.hive.ql.metadata.Table hiveTable = store.getHiveTable(DB_NAME, REGION); + assertEquals(descriptor.getInputFormat(), hiveTable.getSd().getInputFormat()); + assertEquals(descriptor.getOutputFormat(), hiveTable.getSd().getOutputFormat()); + TableDesc table1 = new TableDesc(store.getTable(DB_NAME, REGION)); assertEquals(table.getName(), table1.getName()); assertEquals(table.getUri(), table1.getUri()); @@ -595,6 +634,11 @@ public class TestHiveCatalogStore { store.createTable(table.getProto()); assertTrue(store.existTable(DB_NAME, CUSTOMER)); + StorageFormatDescriptor descriptor = formatFactory.get(IOConstants.PARQUET); + org.apache.hadoop.hive.ql.metadata.Table hiveTable = store.getHiveTable(DB_NAME, CUSTOMER); + assertEquals(descriptor.getInputFormat(), hiveTable.getSd().getInputFormat()); + assertEquals(descriptor.getOutputFormat(), hiveTable.getSd().getOutputFormat()); + TableDesc table1 = new TableDesc(store.getTable(DB_NAME, CUSTOMER)); assertEquals(table.getName(), table1.getName()); assertEquals(table.getUri(), table1.getUri()); @@ -610,7 +654,7 @@ public class TestHiveCatalogStore { public void testDataTypeCompatibility() throws Exception { String tableName = CatalogUtil.normalizeIdentifier("testDataTypeCompatibility"); - TableMeta meta = new TableMeta("TEXT", new KeyValueSet()); + TableMeta meta = new TableMeta(BuiltinStorages.TEXT, new KeyValueSet()); org.apache.tajo.catalog.Schema schema = new org.apache.tajo.catalog.Schema(); schema.addColumn("col1", TajoDataTypes.Type.INT4); http://git-wip-us.apache.org/repos/asf/tajo/blob/ec1337ff/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java ---------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java b/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java index ba0c37b..bb053cc 100644 --- a/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java +++ b/tajo-common/src/main/java/org/apache/tajo/storage/StorageConstants.java @@ -85,7 +85,7 @@ public class StorageConstants { public static final String ORC_STRIPE_SIZE = "orc.stripe.size"; public static final String DEFAULT_ORC_STRIPE_SIZE = "67108864"; // 64MB - public static final String ORC_COMPRESSION_KIND = "orc.compression.kind"; + public static final String ORC_COMPRESSION = "orc.compress"; public static final String ORC_COMPRESSION_KIND_NONE = "none"; public static final String ORC_COMPRESSION_KIND_SNAPPY = "snappy"; public static final String ORC_COMPRESSION_KIND_LZO = "lzo"; http://git-wip-us.apache.org/repos/asf/tajo/blob/ec1337ff/tajo-dist/src/main/conf/tajo-env.cmd ---------------------------------------------------------------------- diff --git a/tajo-dist/src/main/conf/tajo-env.cmd b/tajo-dist/src/main/conf/tajo-env.cmd index f005430..4040a4a 100644 --- a/tajo-dist/src/main/conf/tajo-env.cmd +++ b/tajo-dist/src/main/conf/tajo-env.cmd @@ -68,7 +68,7 @@ set JAVA_HOME=%JAVA_HOME% @rem Tajo cluster mode. the default mode is standby mode. set TAJO_WORKER_STANDBY_MODE=true -@rem It must be required to use HCatalogStore +@rem It must be required to use HiveCatalogStore @rem set HIVE_HOME= @rem set HIVE_JDBC_DRIVER_DIR= http://git-wip-us.apache.org/repos/asf/tajo/blob/ec1337ff/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/ORCAppender.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/ORCAppender.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/ORCAppender.java index 4544ed3..dbbf5a6 100644 --- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/ORCAppender.java +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/ORCAppender.java @@ -111,7 +111,7 @@ public class ORCAppender extends FileAppender { } private CompressionKind getCompressionKind() { - String kindstr = meta.getOption(StorageConstants.ORC_COMPRESSION_KIND, StorageConstants.DEFAULT_ORC_COMPRESSION_KIND); + String kindstr = meta.getOption(StorageConstants.ORC_COMPRESSION, StorageConstants.DEFAULT_ORC_COMPRESSION_KIND); if (kindstr.equalsIgnoreCase(StorageConstants.ORC_COMPRESSION_KIND_ZIP)) { return CompressionKind.ZLIB;
