Modified: hive/trunk/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/TestHCatClient.java URL: http://svn.apache.org/viewvc/hive/trunk/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/TestHCatClient.java?rev=1618582&r1=1618581&r2=1618582&view=diff ============================================================================== --- hive/trunk/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/TestHCatClient.java (original) +++ hive/trunk/hcatalog/webhcat/java-client/src/test/java/org/apache/hive/hcatalog/api/TestHCatClient.java Mon Aug 18 10:59:56 2014 @@ -21,6 +21,7 @@ package org.apache.hive.hcatalog.api; import java.math.BigInteger; import java.util.ArrayList; import java.util.Arrays; +import java.util.EnumSet; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -38,7 +39,7 @@ import org.apache.hadoop.hive.ql.io.orc. import org.apache.hadoop.hive.ql.io.orc.OrcSerde; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.serde.serdeConstants; -import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe; +import org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hive.hcatalog.cli.SemanticAnalysis.HCatSemanticAnalyzer; import org.apache.hive.hcatalog.common.HCatConstants; @@ -65,19 +66,37 @@ public class TestHCatClient { private static final Logger LOG = LoggerFactory.getLogger(TestHCatClient.class); private static final String msPort = "20101"; private static HiveConf hcatConf; + private static boolean isReplicationTargetHCatRunning = false; + private static final String replicationTargetHCatPort = "20102"; + private static HiveConf replicationTargetHCatConf; private static SecurityManager securityManager; private static class RunMS implements Runnable { + private final String msPort; + private List<String> args = new ArrayList<String>(); + + public RunMS(String msPort) { + this.msPort = msPort; + this.args.add("-v"); + this.args.add("-p"); + this.args.add(this.msPort); + } + + public RunMS arg(String arg) { + this.args.add(arg); + return this; + } + @Override public void run() { try { - HiveMetaStore.main(new String[]{"-v", "-p", msPort}); + HiveMetaStore.main(args.toArray(new String[args.size()])); } catch (Throwable t) { LOG.error("Exiting. Got exception from metastore: ", t); } } - } + } // class RunMS; @AfterClass public static void tearDown() throws Exception { @@ -88,9 +107,9 @@ public class TestHCatClient { @BeforeClass public static void startMetaStoreServer() throws Exception { - Thread t = new Thread(new RunMS()); + Thread t = new Thread(new RunMS(msPort)); t.start(); - Thread.sleep(40000); + Thread.sleep(10000); securityManager = System.getSecurityManager(); System.setSecurityManager(new NoExitSecurityManager()); @@ -152,7 +171,7 @@ public class TestHCatClient { assertTrue(table1.getOutputFileFormat().equalsIgnoreCase( RCFileOutputFormat.class.getName())); assertTrue(table1.getSerdeLib().equalsIgnoreCase( - ColumnarSerDe.class.getName())); + LazyBinaryColumnarSerDe.class.getName())); assertTrue(table1.getCols().equals(cols)); // Since "ifexists" was not set to true, trying to create the same table // again @@ -171,8 +190,8 @@ public class TestHCatClient { mapKeysTerminatedBy('\004').collectionItemsTerminatedBy('\005').nullDefinedAs('\006').build(); client.createTable(tableDesc2); HCatTable table2 = client.getTable(db, tableTwo); - assertTrue(table2.getInputFileFormat().equalsIgnoreCase( - TextInputFormat.class.getName())); + assertTrue("Expected TextInputFormat, but got: " + table2.getInputFileFormat(), + table2.getInputFileFormat().equalsIgnoreCase(TextInputFormat.class.getName())); assertTrue(table2.getOutputFileFormat().equalsIgnoreCase( HiveIgnoreKeyTextOutputFormat.class.getName())); assertTrue("SerdeParams not found", table2.getSerdeParams() != null); @@ -222,9 +241,10 @@ public class TestHCatClient { cols.add(new HCatFieldSchema("id", Type.INT, "id comment")); cols.add(new HCatFieldSchema("value", Type.STRING, "value comment")); + client.dropTable(dbName, tblName, true); // Create a minimalistic table client.createTable(HCatCreateTableDesc - .create(dbName, tblName, cols) + .create(new HCatTable(dbName, tblName).cols(cols), false) .build()); HCatTable tCreated = client.getTable(dbName, tblName); @@ -281,21 +301,26 @@ public class TestHCatClient { ptnCols.add(new HCatFieldSchema("dt", Type.STRING, "date column")); ptnCols.add(new HCatFieldSchema("country", Type.STRING, "country column")); - HCatCreateTableDesc tableDesc = HCatCreateTableDesc - .create(dbName, tableName, cols).fileFormat("sequencefile") - .partCols(ptnCols).build(); + HCatTable table = new HCatTable(dbName, tableName).cols(cols) + .partCols(ptnCols) + .fileFormat("sequenceFile"); + HCatCreateTableDesc tableDesc = HCatCreateTableDesc.create(table, false).build(); client.createTable(tableDesc); + // Verify that the table is created successfully. + table = client.getTable(dbName, tableName); + Map<String, String> firstPtn = new HashMap<String, String>(); firstPtn.put("dt", "04/30/2012"); firstPtn.put("country", "usa"); - HCatAddPartitionDesc addPtn = HCatAddPartitionDesc.create(dbName, - tableName, null, firstPtn).build(); + // Test new HCatAddPartitionsDesc API. + HCatAddPartitionDesc addPtn = HCatAddPartitionDesc.create(new HCatPartition(table, firstPtn, null)).build(); client.addPartition(addPtn); Map<String, String> secondPtn = new HashMap<String, String>(); secondPtn.put("dt", "04/12/2012"); secondPtn.put("country", "brazil"); + // Test deprecated HCatAddPartitionsDesc API. HCatAddPartitionDesc addPtn2 = HCatAddPartitionDesc.create(dbName, tableName, null, secondPtn).build(); client.addPartition(addPtn2); @@ -303,6 +328,7 @@ public class TestHCatClient { Map<String, String> thirdPtn = new HashMap<String, String>(); thirdPtn.put("dt", "04/13/2012"); thirdPtn.put("country", "argentina"); + // Test deprecated HCatAddPartitionsDesc API. HCatAddPartitionDesc addPtn3 = HCatAddPartitionDesc.create(dbName, tableName, null, thirdPtn).build(); client.addPartition(addPtn3); @@ -540,9 +566,8 @@ public class TestHCatClient { List<HCatFieldSchema> columns = Arrays.asList(new HCatFieldSchema("col", Type.STRING, "")); ArrayList<HCatFieldSchema> partitionColumns = new ArrayList<HCatFieldSchema>( Arrays.asList(new HCatFieldSchema(partitionColumn, Type.STRING, ""))); - client.createTable(HCatCreateTableDesc.create(dbName, tableName, columns) - .partCols(partitionColumns) - .build()); + HCatTable table = new HCatTable(dbName, tableName).cols(columns).partCols(partitionColumns); + client.createTable(HCatCreateTableDesc.create(table, false).build()); Map<String, String> partitionSpec = new HashMap<String, String>(); partitionSpec.put(partitionColumn, "foobar"); @@ -555,7 +580,7 @@ public class TestHCatClient { exception instanceof ObjectNotFoundException); } - client.addPartition(HCatAddPartitionDesc.create(dbName, tableName, "", partitionSpec).build()); + client.addPartition(HCatAddPartitionDesc.create(new HCatPartition(table, partitionSpec, "")).build()); // Test that listPartitionsByFilter() returns an empty-set, if the filter selects no partitions. assertEquals("Expected empty set of partitions.", @@ -649,21 +674,26 @@ public class TestHCatClient { List<HCatFieldSchema> partitionSchema = Arrays.asList(new HCatFieldSchema("dt", Type.STRING, ""), new HCatFieldSchema("grid", Type.STRING, "")); - client.createTable(HCatCreateTableDesc.create(dbName, tableName, columnSchema).partCols(new ArrayList<HCatFieldSchema>(partitionSchema)).build()); + HCatTable table = new HCatTable(dbName, tableName).cols(columnSchema).partCols(partitionSchema); + client.createTable(HCatCreateTableDesc.create(table, false).build()); + + // Verify that the table was created successfully. + table = client.getTable(dbName, tableName); + assertNotNull("The created just now can't be null.", table); Map<String, String> partitionSpec = new HashMap<String, String>(); partitionSpec.put("grid", "AB"); partitionSpec.put("dt", "2011_12_31"); - client.addPartition(HCatAddPartitionDesc.create(dbName, tableName, "", partitionSpec).build()); + client.addPartition(HCatAddPartitionDesc.create(new HCatPartition(table, partitionSpec, "")).build()); partitionSpec.put("grid", "AB"); partitionSpec.put("dt", "2012_01_01"); - client.addPartition(HCatAddPartitionDesc.create(dbName, tableName, "", partitionSpec).build()); + client.addPartition(HCatAddPartitionDesc.create(new HCatPartition(table, partitionSpec, "")).build()); partitionSpec.put("dt", "2012_01_01"); partitionSpec.put("grid", "OB"); - client.addPartition(HCatAddPartitionDesc.create(dbName, tableName, "", partitionSpec).build()); + client.addPartition(HCatAddPartitionDesc.create(new HCatPartition(table, partitionSpec, "")).build()); partitionSpec.put("dt", "2012_01_01"); partitionSpec.put("grid", "XB"); - client.addPartition(HCatAddPartitionDesc.create(dbName, tableName, "", partitionSpec).build()); + client.addPartition(HCatAddPartitionDesc.create(new HCatPartition(table, partitionSpec, "")).build()); Map<String, String> partialPartitionSpec = new HashMap<String, String>(); partialPartitionSpec.put("dt", "2012_01_01"); @@ -698,21 +728,26 @@ public class TestHCatClient { List<HCatFieldSchema> partitionSchema = Arrays.asList(new HCatFieldSchema("dt", Type.STRING, ""), new HCatFieldSchema("grid", Type.STRING, "")); - client.createTable(HCatCreateTableDesc.create(dbName, tableName, columnSchema).partCols(new ArrayList<HCatFieldSchema>(partitionSchema)).build()); + HCatTable table = new HCatTable(dbName, tableName).cols(columnSchema).partCols(partitionSchema); + client.createTable(HCatCreateTableDesc.create(table, false).build()); + + // Verify that the table was created successfully. + table = client.getTable(dbName, tableName); + assertNotNull("Table couldn't be queried for. ", table); Map<String, String> partitionSpec = new HashMap<String, String>(); partitionSpec.put("grid", "AB"); partitionSpec.put("dt", "2011_12_31"); - client.addPartition(HCatAddPartitionDesc.create(dbName, tableName, "", partitionSpec).build()); + client.addPartition(HCatAddPartitionDesc.create(new HCatPartition(table, partitionSpec, "")).build()); partitionSpec.put("grid", "AB"); partitionSpec.put("dt", "2012_01_01"); - client.addPartition(HCatAddPartitionDesc.create(dbName, tableName, "", partitionSpec).build()); + client.addPartition(HCatAddPartitionDesc.create(new HCatPartition(table, partitionSpec, "")).build()); partitionSpec.put("dt", "2012_01_01"); partitionSpec.put("grid", "OB"); - client.addPartition(HCatAddPartitionDesc.create(dbName, tableName, "", partitionSpec).build()); + client.addPartition(HCatAddPartitionDesc.create(new HCatPartition(table, partitionSpec, "")).build()); partitionSpec.put("dt", "2012_01_01"); partitionSpec.put("grid", "XB"); - client.addPartition(HCatAddPartitionDesc.create(dbName, tableName, "", partitionSpec).build()); + client.addPartition(HCatAddPartitionDesc.create(new HCatPartition(table, partitionSpec, "")).build()); Map<String, String> partialPartitionSpec = new HashMap<String, String>(); partialPartitionSpec.put("dt", "2012_01_01"); @@ -731,4 +766,235 @@ public class TestHCatClient { } } + private void startReplicationTargetMetaStoreIfRequired() throws Exception { + if (!isReplicationTargetHCatRunning) { + Thread t = new Thread(new RunMS(replicationTargetHCatPort) + .arg("--hiveconf") + .arg("javax.jdo.option.ConnectionURL") // Reset, to use a different Derby instance. + .arg(hcatConf.get("javax.jdo.option.ConnectionURL") + .replace("metastore", "target_metastore"))); + t.start(); + Thread.sleep(10000); + replicationTargetHCatConf = new HiveConf(hcatConf); + replicationTargetHCatConf.setVar(HiveConf.ConfVars.METASTOREURIS, + "thrift://localhost:" + replicationTargetHCatPort); + isReplicationTargetHCatRunning = true; + } + } + + /** + * Test for detecting schema-changes for an HCatalog table, across 2 different HCat instances. + * A table is created with the same schema on 2 HCat instances. The table-schema is modified on the source HCat + * instance (columns, I/O formats, SerDe definitions, etc.). The table metadata is compared between source + * and target, the changes are detected and propagated to target. + * @throws Exception + */ + @Test + public void testTableSchemaPropagation() throws Exception { + try { + startReplicationTargetMetaStoreIfRequired(); + HCatClient sourceMetaStore = HCatClient.create(new Configuration(hcatConf)); + final String dbName = "myDb"; + final String tableName = "myTable"; + + sourceMetaStore.dropDatabase(dbName, true, HCatClient.DropDBMode.CASCADE); + + sourceMetaStore.createDatabase(HCatCreateDBDesc.create(dbName).build()); + List<HCatFieldSchema> columnSchema = Arrays.asList(new HCatFieldSchema("foo", Type.INT, ""), + new HCatFieldSchema("bar", Type.STRING, "")); + + List<HCatFieldSchema> partitionSchema = Arrays.asList(new HCatFieldSchema("dt", Type.STRING, ""), + new HCatFieldSchema("grid", Type.STRING, "")); + + HCatTable sourceTable = new HCatTable(dbName, tableName).cols(columnSchema).partCols(partitionSchema); + sourceMetaStore.createTable(HCatCreateTableDesc.create(sourceTable).build()); + + // Verify that the sourceTable was created successfully. + sourceTable = sourceMetaStore.getTable(dbName, tableName); + assertNotNull("Table couldn't be queried for. ", sourceTable); + + // Serialize Table definition. Deserialize using the target HCatClient instance. + String tableStringRep = sourceMetaStore.serializeTable(sourceTable); + HCatClient targetMetaStore = HCatClient.create(new Configuration(replicationTargetHCatConf)); + targetMetaStore.dropDatabase(dbName, true, HCatClient.DropDBMode.CASCADE); + targetMetaStore.createDatabase(HCatCreateDBDesc.create(dbName).build()); + + HCatTable targetTable = targetMetaStore.deserializeTable(tableStringRep); + + assertEquals("Table after deserialization should have been identical to sourceTable.", + sourceTable.diff(targetTable), HCatTable.NO_DIFF); + + // Create table on Target. + targetMetaStore.createTable(HCatCreateTableDesc.create(targetTable).build()); + // Verify that the created table is identical to sourceTable. + targetTable = targetMetaStore.getTable(dbName, tableName); + assertEquals("Table after deserialization should have been identical to sourceTable.", + sourceTable.diff(targetTable), HCatTable.NO_DIFF); + + // Modify sourceTable. + List<HCatFieldSchema> newColumnSchema = new ArrayList<HCatFieldSchema>(columnSchema); + newColumnSchema.add(new HCatFieldSchema("goo_new", Type.DOUBLE, "")); + Map<String, String> tableParams = new HashMap<String, String>(1); + tableParams.put("orc.compress", "ZLIB"); + sourceTable.cols(newColumnSchema) // Add a column. + .fileFormat("orcfile") // Change SerDe, File I/O formats. + .tblProps(tableParams) + .serdeParam(serdeConstants.FIELD_DELIM, Character.toString('\001')); + sourceMetaStore.updateTableSchema(dbName, tableName, sourceTable); + sourceTable = sourceMetaStore.getTable(dbName, tableName); + + // Diff against table on target. + + EnumSet<HCatTable.TableAttribute> diff = targetTable.diff(sourceTable); + assertTrue("Couldn't find change in column-schema.", + diff.contains(HCatTable.TableAttribute.COLUMNS)); + assertTrue("Couldn't find change in InputFormat.", + diff.contains(HCatTable.TableAttribute.INPUT_FORMAT)); + assertTrue("Couldn't find change in OutputFormat.", + diff.contains(HCatTable.TableAttribute.OUTPUT_FORMAT)); + assertTrue("Couldn't find change in SerDe.", + diff.contains(HCatTable.TableAttribute.SERDE)); + assertTrue("Couldn't find change in SerDe parameters.", + diff.contains(HCatTable.TableAttribute.SERDE_PROPERTIES)); + assertTrue("Couldn't find change in Table parameters.", + diff.contains(HCatTable.TableAttribute.TABLE_PROPERTIES)); + + // Replicate the changes to the replicated-table. + targetMetaStore.updateTableSchema(dbName, tableName, targetTable.resolve(sourceTable, diff)); + targetTable = targetMetaStore.getTable(dbName, tableName); + + assertEquals("After propagating schema changes, source and target tables should have been equivalent.", + targetTable.diff(sourceTable), HCatTable.NO_DIFF); + + } + catch (Exception unexpected) { + LOG.error("Unexpected exception!", unexpected); + assertTrue("Unexpected exception! " + unexpected.getMessage(), false); + } + } + + /** + * Test that partition-definitions can be replicated between HCat-instances, + * independently of table-metadata replication. + * 2 identical tables are created on 2 different HCat instances ("source" and "target"). + * On the source instance, + * 1. One partition is added with the old format ("TEXTFILE"). + * 2. The table is updated with an additional column and the data-format changed to ORC. + * 3. Another partition is added with the new format. + * 4. The partitions' metadata is copied to the target HCat instance, without updating the target table definition. + * 5. The partitions' metadata is tested to be an exact replica of that on the source. + * @throws Exception + */ + @Test + public void testPartitionRegistrationWithCustomSchema() throws Exception { + try { + startReplicationTargetMetaStoreIfRequired(); + + HCatClient sourceMetaStore = HCatClient.create(new Configuration(hcatConf)); + final String dbName = "myDb"; + final String tableName = "myTable"; + + sourceMetaStore.dropDatabase(dbName, true, HCatClient.DropDBMode.CASCADE); + + sourceMetaStore.createDatabase(HCatCreateDBDesc.create(dbName).build()); + List<HCatFieldSchema> columnSchema = new ArrayList<HCatFieldSchema>( + Arrays.asList(new HCatFieldSchema("foo", Type.INT, ""), + new HCatFieldSchema("bar", Type.STRING, ""))); + + List<HCatFieldSchema> partitionSchema = Arrays.asList(new HCatFieldSchema("dt", Type.STRING, ""), + new HCatFieldSchema("grid", Type.STRING, "")); + + HCatTable sourceTable = new HCatTable(dbName, tableName).cols(columnSchema) + .partCols(partitionSchema) + .comment("Source table."); + + sourceMetaStore.createTable(HCatCreateTableDesc.create(sourceTable).build()); + + // Verify that the sourceTable was created successfully. + sourceTable = sourceMetaStore.getTable(dbName, tableName); + assertNotNull("Table couldn't be queried for. ", sourceTable); + + // Partitions added now should inherit table-schema, properties, etc. + Map<String, String> partitionSpec_1 = new HashMap<String, String>(); + partitionSpec_1.put("grid", "AB"); + partitionSpec_1.put("dt", "2011_12_31"); + HCatPartition sourcePartition_1 = new HCatPartition(sourceTable, partitionSpec_1, ""); + + sourceMetaStore.addPartition(HCatAddPartitionDesc.create(sourcePartition_1).build()); + assertEquals("Unexpected number of partitions. ", + sourceMetaStore.getPartitions(dbName, tableName).size(), 1); + // Verify that partition_1 was added correctly, and properties were inherited from the HCatTable. + HCatPartition addedPartition_1 = sourceMetaStore.getPartition(dbName, tableName, partitionSpec_1); + assertEquals("Column schema doesn't match.", addedPartition_1.getColumns(), sourceTable.getCols()); + assertEquals("InputFormat doesn't match.", addedPartition_1.getInputFormat(), sourceTable.getInputFileFormat()); + assertEquals("OutputFormat doesn't match.", addedPartition_1.getOutputFormat(), sourceTable.getOutputFileFormat()); + assertEquals("SerDe doesn't match.", addedPartition_1.getSerDe(), sourceTable.getSerdeLib()); + assertEquals("SerDe params don't match.", addedPartition_1.getSerdeParams(), sourceTable.getSerdeParams()); + + // Replicate table definition. + + HCatClient targetMetaStore = HCatClient.create(new Configuration(replicationTargetHCatConf)); + targetMetaStore.dropDatabase(dbName, true, HCatClient.DropDBMode.CASCADE); + + targetMetaStore.createDatabase(HCatCreateDBDesc.create(dbName).build()); + // Make a copy of the source-table, as would be done across class-loaders. + HCatTable targetTable = targetMetaStore.deserializeTable(sourceMetaStore.serializeTable(sourceTable)); + targetMetaStore.createTable(HCatCreateTableDesc.create(targetTable).build()); + targetTable = targetMetaStore.getTable(dbName, tableName); + + assertEquals("Created table doesn't match the source.", + targetTable.diff(sourceTable), HCatTable.NO_DIFF); + + // Modify Table schema at the source. + List<HCatFieldSchema> newColumnSchema = new ArrayList<HCatFieldSchema>(columnSchema); + newColumnSchema.add(new HCatFieldSchema("goo_new", Type.DOUBLE, "")); + Map<String, String> tableParams = new HashMap<String, String>(1); + tableParams.put("orc.compress", "ZLIB"); + sourceTable.cols(newColumnSchema) // Add a column. + .fileFormat("orcfile") // Change SerDe, File I/O formats. + .tblProps(tableParams) + .serdeParam(serdeConstants.FIELD_DELIM, Character.toString('\001')); + sourceMetaStore.updateTableSchema(dbName, tableName, sourceTable); + sourceTable = sourceMetaStore.getTable(dbName, tableName); + + // Add another partition to the source. + Map<String, String> partitionSpec_2 = new HashMap<String, String>(); + partitionSpec_2.put("grid", "AB"); + partitionSpec_2.put("dt", "2012_01_01"); + HCatPartition sourcePartition_2 = new HCatPartition(sourceTable, partitionSpec_2, ""); + sourceMetaStore.addPartition(HCatAddPartitionDesc.create(sourcePartition_2).build()); + + // The source table now has 2 partitions, one in TEXTFILE, the other in ORC. + // Test that adding these partitions to the target-table *without* replicating the table-change. + + List<HCatPartition> sourcePartitions = sourceMetaStore.getPartitions(dbName, tableName); + assertEquals("Unexpected number of source partitions.", 2, sourcePartitions.size()); + + List<HCatAddPartitionDesc> addPartitionDescs = new ArrayList<HCatAddPartitionDesc>(sourcePartitions.size()); + for (HCatPartition partition : sourcePartitions) { + addPartitionDescs.add(HCatAddPartitionDesc.create(partition).build()); + } + + targetMetaStore.addPartitions(addPartitionDescs); + + List<HCatPartition> targetPartitions = targetMetaStore.getPartitions(dbName, tableName); + + assertEquals("Expected the same number of partitions. ", targetPartitions.size(), sourcePartitions.size()); + + for (int i=0; i<targetPartitions.size(); ++i) { + HCatPartition sourcePartition = sourcePartitions.get(i), + targetPartition = targetPartitions.get(i); + assertEquals("Column schema doesn't match.", sourcePartition.getColumns(), targetPartition.getColumns()); + assertEquals("InputFormat doesn't match.", sourcePartition.getInputFormat(), targetPartition.getInputFormat()); + assertEquals("OutputFormat doesn't match.", sourcePartition.getOutputFormat(), targetPartition.getOutputFormat()); + assertEquals("SerDe doesn't match.", sourcePartition.getSerDe(), targetPartition.getSerDe()); + assertEquals("SerDe params don't match.", sourcePartition.getSerdeParams(), targetPartition.getSerdeParams()); + } + + } + catch (Exception unexpected) { + LOG.error( "Unexpected exception! ", unexpected); + assertTrue("Unexpected exception! " + unexpected.getMessage(), false); + } + } }