http://git-wip-us.apache.org/repos/asf/falcon/blob/f9669000/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hive/dr/HiveDRTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hive/dr/HiveDRTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hive/dr/HiveDRTest.java new file mode 100644 index 0000000..1c788a3 --- /dev/null +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hive/dr/HiveDRTest.java @@ -0,0 +1,700 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.regression.hive.dr; + +import org.apache.falcon.cli.FalconCLI; +import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.entity.v0.Frequency; +import org.apache.falcon.regression.Entities.ClusterMerlin; +import org.apache.falcon.regression.Entities.RecipeMerlin; +import org.apache.falcon.regression.core.bundle.Bundle; +import org.apache.falcon.regression.core.enumsAndConstants.MerlinConstants; +import org.apache.falcon.regression.core.helpers.ColoHelper; +import org.apache.falcon.regression.core.supportClasses.NotifyingAssert; +import org.apache.falcon.regression.core.util.BundleUtil; +import org.apache.falcon.regression.core.util.HiveAssert; +import org.apache.falcon.regression.core.util.InstanceUtil; +import org.apache.falcon.regression.core.util.OozieUtil; +import org.apache.falcon.regression.core.util.TimeUtil; +import org.apache.falcon.regression.testHelper.BaseTestClass; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hive.hcatalog.api.HCatClient; +import org.apache.log4j.Logger; +import org.apache.oozie.client.BundleJob; +import org.apache.oozie.client.CoordinatorAction; +import org.apache.oozie.client.CoordinatorJob; +import org.apache.oozie.client.OozieClient; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; +import org.testng.asserts.SoftAssert; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.falcon.regression.core.util.HiveUtil.runSql; +import static org.apache.falcon.regression.hive.dr.HiveObjectCreator.bootstrapCopy; +import static org.apache.falcon.regression.hive.dr.HiveObjectCreator.createExternalTable; +import static org.apache.falcon.regression.hive.dr.HiveObjectCreator.createExternalPartitionedTable; +import static org.apache.falcon.regression.hive.dr.HiveObjectCreator.createPartitionedTable; +import static org.apache.falcon.regression.hive.dr.HiveObjectCreator.createSerDeTable; +import static org.apache.falcon.regression.hive.dr.HiveObjectCreator.createVanillaTable; + +/** + * Hive DR Testing. + */ +@Test(groups = "embedded") +public class HiveDRTest extends BaseTestClass { + private static final Logger LOGGER = Logger.getLogger(HiveDRTest.class); + private static final String DB_NAME = "hdr_sdb1"; + private final ColoHelper cluster = servers.get(0); + private final ColoHelper cluster2 = servers.get(1); + private final ColoHelper cluster3 = servers.get(2); + private final FileSystem clusterFS = serverFS.get(0); + private final FileSystem clusterFS2 = serverFS.get(1); + private final FileSystem clusterFS3 = serverFS.get(2); + private final OozieClient clusterOC = serverOC.get(0); + private final OozieClient clusterOC2 = serverOC.get(1); + private final String baseTestHDFSDir = cleanAndGetTestDir() + "/HiveDR/"; + private HCatClient clusterHC; + private HCatClient clusterHC2; + private RecipeMerlin recipeMerlin; + private Connection connection; + private Connection connection2; + + @BeforeMethod(alwaysRun = true) + public void setUp() throws Exception { + clusterHC = cluster.getClusterHelper().getHCatClient(); + clusterHC2 = cluster2.getClusterHelper().getHCatClient(); + bundles[0] = new Bundle(BundleUtil.readHCatBundle(), cluster); + bundles[1] = new Bundle(BundleUtil.readHCatBundle(), cluster2); + bundles[0].generateUniqueBundle(this); + bundles[1].generateUniqueBundle(this); + final ClusterMerlin srcCluster = bundles[0].getClusterElement(); + final ClusterMerlin tgtCluster = bundles[1].getClusterElement(); + Bundle.submitCluster(bundles[0]); + + if (MerlinConstants.IS_SECURE) { + recipeMerlin = RecipeMerlin.readFromDir("HiveDrSecureRecipe", + FalconCLI.RecipeOperation.HIVE_DISASTER_RECOVERY) + .withRecipeCluster(srcCluster); + } else { + recipeMerlin = RecipeMerlin.readFromDir("HiveDrRecipe", + FalconCLI.RecipeOperation.HIVE_DISASTER_RECOVERY) + .withRecipeCluster(srcCluster); + } + recipeMerlin.withSourceCluster(srcCluster) + .withTargetCluster(tgtCluster) + .withFrequency(new Frequency("5", Frequency.TimeUnit.minutes)) + .withValidity(TimeUtil.getTimeWrtSystemTime(-5), TimeUtil.getTimeWrtSystemTime(5)); + recipeMerlin.setUniqueName(this.getClass().getSimpleName()); + + connection = cluster.getClusterHelper().getHiveJdbcConnection(); + runSql(connection, "drop database if exists hdr_sdb1 cascade"); + runSql(connection, "create database hdr_sdb1"); + runSql(connection, "use hdr_sdb1"); + + connection2 = cluster2.getClusterHelper().getHiveJdbcConnection(); + runSql(connection2, "drop database if exists hdr_sdb1 cascade"); + runSql(connection2, "create database hdr_sdb1"); + runSql(connection2, "use hdr_sdb1"); + } + + @Test + public void drPartition() throws Exception { + final String tblName = "partitionDR"; + recipeMerlin.withSourceDb(DB_NAME).withSourceTable(tblName); + final List<String> command = recipeMerlin.getSubmissionCommand(); + + runSql(connection, + "create table " + tblName + "(comment string) partitioned by (pname string)"); + runSql(connection, + "insert into table " + tblName + " partition (pname = 'DELETE') values" + + "('this partition is going to be deleted - should NOT appear after dr')"); + runSql(connection, + "insert into table " + tblName + " partition (pname = 'REPLACE') values" + + "('this partition is going to be replaced - should NOT appear after dr')"); + runSql(connection, + "insert into table " + tblName + " partition (pname = 'ADD_DATA') values" + + "('this partition will have more data - should appear after dr')"); + + bootstrapCopy(connection, clusterFS, tblName, connection2, clusterFS2, tblName); + + runSql(connection, + "insert into table " + tblName + " partition (pname = 'NEW_PART') values" + + "('this partition has been added post bootstrap - should appear after dr')"); + runSql(connection, + "insert into table " + tblName + " partition (pname = 'ADD_DATA') values" + + "('more data has been added post bootstrap - should appear after dr')"); + runSql(connection, + "alter table " + tblName + " drop partition(pname = 'DELETE')"); + runSql(connection, + "alter table " + tblName + " drop partition(pname = 'REPLACE')"); + runSql(connection, + "insert into table " + tblName + " partition (pname = 'REPLACE') values" + + "('this partition has been replaced - should appear after dr')"); + + Assert.assertEquals(Bundle.runFalconCLI(command), 0, "Recipe submission failed."); + + InstanceUtil.waitTillInstanceReachState(clusterOC, recipeMerlin.getName(), 1, + CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS); + + HiveAssert.assertTableEqual(cluster, clusterHC.getTable(DB_NAME, tblName), + cluster2, clusterHC2.getTable(DB_NAME, tblName), new NotifyingAssert(true) + ).assertAll(); + } + + @Test + public void drInsertOverwritePartition() throws Exception { + final String tblName = "drInsertOverwritePartition"; + final String hlpTblName = "drInsertOverwritePartitionHelperTbl"; + recipeMerlin.withSourceDb(DB_NAME).withSourceTable(tblName); + final List<String> command = recipeMerlin.getSubmissionCommand(); + runSql(connection, "create table " + hlpTblName + "(comment string)"); + runSql(connection, + "insert into table " + hlpTblName + + " values('overwrite data - should appear after dr')"); + runSql(connection, + "insert into table " + hlpTblName + " values('newdata row2 - should appear after dr')"); + runSql(connection, + "insert into table " + hlpTblName + " values('newdata row1 - should appear after dr')"); + + runSql(connection, + "create table " + tblName + "(comment string) partitioned by (pname string)"); + runSql(connection, + "insert into table " + tblName + " partition (pname = 'OLD_PART') values" + + "('this data should be retained - should appear after dr')"); + runSql(connection, + "insert into table " + tblName + " partition (pname = 'OVERWRITE_PART') values" + + "('this data should get overwritten - should NOT appear after dr')"); + + bootstrapCopy(connection, clusterFS, tblName, connection2, clusterFS2, tblName); + + runSql(connection, + "insert overwrite table " + tblName + " partition (pname = 'OVERWRITE_PART') " + + "select * from " + hlpTblName + " where comment REGEXP '^overwrite'"); + runSql(connection, + "insert overwrite table " + tblName + " partition (pname = 'NEW_DATA') " + + "select * from " + hlpTblName + " where comment REGEXP '^newdata'"); + + Assert.assertEquals(Bundle.runFalconCLI(command), 0, "Recipe submission failed."); + + InstanceUtil.waitTillInstanceReachState(clusterOC, recipeMerlin.getName(), 1, + CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS); + + HiveAssert.assertTableEqual(cluster, clusterHC.getTable(DB_NAME, tblName), + cluster2, clusterHC2.getTable(DB_NAME, tblName), new NotifyingAssert(true) + ).assertAll(); + } + + @Test + public void drTwoTablesOneRequest() throws Exception { + final String tblName = "firstTableDR"; + final String tbl2Name = "secondTableDR"; + recipeMerlin.withSourceDb(DB_NAME).withSourceTable(tblName + ',' + tbl2Name); + final List<String> command = recipeMerlin.getSubmissionCommand(); + + runSql(connection, + "create table " + tblName + "(comment string)"); + runSql(connection, + "create table " + tbl2Name + "(comment string)"); + + bootstrapCopy(connection, clusterFS, tblName, connection2, clusterFS2, tblName); + bootstrapCopy(connection, clusterFS, tbl2Name, connection2, clusterFS2, tbl2Name); + + runSql(connection, + "insert into table " + tblName + " values" + + "('this string has been added post bootstrap - should appear after dr')"); + runSql(connection, + "insert into table " + tbl2Name + " values" + + "('this string has been added post bootstrap - should appear after dr')"); + + Assert.assertEquals(Bundle.runFalconCLI(command), 0, "Recipe submission failed."); + + InstanceUtil.waitTillInstanceReachState(clusterOC, recipeMerlin.getName(), 1, + CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS); + + final NotifyingAssert anAssert = new NotifyingAssert(true); + HiveAssert.assertTableEqual(cluster, clusterHC.getTable(DB_NAME, tblName), + cluster2, clusterHC2.getTable(DB_NAME, tblName), anAssert); + HiveAssert.assertTableEqual(cluster, clusterHC.getTable(DB_NAME, tbl2Name), + cluster2, clusterHC2.getTable(DB_NAME, tbl2Name), anAssert); + anAssert.assertAll(); + + } + + @Test + public void drSerDeWithProperties() throws Exception { + final String tblName = "serdeTable"; + recipeMerlin.withSourceDb(DB_NAME).withSourceTable(tblName); + final List<String> command = recipeMerlin.getSubmissionCommand(); + + runSql(connection, + "create table " + tblName + "(comment string) " + + "row format serde 'org.apache.hive.hcatalog.data.JsonSerDe'"); + + bootstrapCopy(connection, clusterFS, tblName, connection2, clusterFS2, tblName); + + runSql(connection, + "insert into table " + tblName + " values" + + "('this string has been added post bootstrap - should appear after dr')"); + + runSql(connection, + "ALTER TABLE " + tblName + " SET SERDEPROPERTIES ('someProperty' = 'value')"); + + Assert.assertEquals(Bundle.runFalconCLI(command), 0, "Recipe submission failed."); + + InstanceUtil.waitTillInstanceReachState(clusterOC, recipeMerlin.getName(), 1, + CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS); + + HiveAssert.assertTableEqual(cluster, clusterHC.getTable(DB_NAME, tblName), + cluster2, clusterHC2.getTable(DB_NAME, tblName), new NotifyingAssert(true) + ).assertAll(); + + } + + @Test + public void drChangeColumn() throws Exception { + final String tblName = "tableForColumnChange"; + recipeMerlin.withSourceDb(DB_NAME).withSourceTable(tblName); + final List<String> command1 = recipeMerlin.getSubmissionCommand(); + final String recipe1Name = recipeMerlin.getName(); + runSql(connection, + "create table " + tblName + "(id int)"); + + bootstrapCopy(connection, clusterFS, tblName, connection2, clusterFS2, tblName); + + Assert.assertEquals(Bundle.runFalconCLI(command1), 0, "Recipe submission failed."); + runSql(connection, + "ALTER TABLE " + tblName + " CHANGE id id STRING COMMENT 'some_comment'"); + + + InstanceUtil.waitTillInstanceReachState(clusterOC, recipe1Name, 1, + CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS); + + + HiveAssert.assertTableEqual(cluster, clusterHC.getTable(DB_NAME, tblName), + cluster2, clusterHC2.getTable(DB_NAME, tblName), new NotifyingAssert(true) + ).assertAll(); + } + + + @Test + public void drTwoDstTablesTwoRequests() throws Exception { + final HCatClient clusterHC3 = cluster3.getClusterHelper().getHCatClient(); + final Connection connection3 = cluster3.getClusterHelper().getHiveJdbcConnection(); + runSql(connection3, "drop database if exists hdr_sdb1 cascade"); + runSql(connection3, "create database hdr_sdb1"); + runSql(connection3, "use hdr_sdb1"); + + final String tblName = "vanillaTable"; + recipeMerlin.withSourceDb(DB_NAME).withSourceTable(tblName); + final String recipe1Name = recipeMerlin.getName(); + final List<String> command1 = recipeMerlin.getSubmissionCommand(); + + final Bundle bundle = BundleUtil.readHCatBundle(); + bundle.generateUniqueBundle(this); + recipeMerlin.withTargetCluster(new Bundle(bundle, cluster3).getClusterElement()); + recipeMerlin.setUniqueName(this.getClass().getSimpleName()); + + final List<String> command2 = recipeMerlin.getSubmissionCommand(); + final String recipe2Name = recipeMerlin.getName(); + + runSql(connection, "create table " + tblName + "(comment string)"); + + bootstrapCopy(connection, clusterFS, tblName, connection2, clusterFS2, tblName); + bootstrapCopy(connection, clusterFS, tblName, connection3, clusterFS3, tblName); + + runSql(connection, + "insert into table " + tblName + " values" + + "('this string has been added post bootstrap - should appear after dr')"); + + Assert.assertEquals(Bundle.runFalconCLI(command1), 0, "Recipe submission failed."); + Assert.assertEquals(Bundle.runFalconCLI(command2), 0, "Recipe submission failed."); + + + InstanceUtil.waitTillInstanceReachState(clusterOC, recipe1Name, 1, + CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS); + InstanceUtil.waitTillInstanceReachState(clusterOC, recipe2Name, 1, + CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS); + + final NotifyingAssert anAssert = new NotifyingAssert(true); + HiveAssert.assertTableEqual(cluster, clusterHC.getTable(DB_NAME, tblName), + cluster2, clusterHC2.getTable(DB_NAME, tblName), anAssert); + HiveAssert.assertTableEqual(cluster, clusterHC.getTable(DB_NAME, tblName), + cluster3, clusterHC3.getTable(DB_NAME, tblName), anAssert); + anAssert.assertAll(); + } + + @Test + public void drExternalToNonExternal() throws Exception { + final String tblName = "externalToNonExternal"; + recipeMerlin.withSourceDb(DB_NAME).withSourceTable(tblName); + final List<String> command = recipeMerlin.getSubmissionCommand(); + + createExternalTable(connection, clusterFS, baseTestHDFSDir + "click_data/", tblName); + bootstrapCopy(connection, clusterFS, tblName, connection2, clusterFS2, tblName); + + //change column name + runSql(connection, + "alter table " + tblName + " change column data data_new string"); + + Assert.assertEquals(Bundle.runFalconCLI(command), 0, "Recipe submission failed."); + + InstanceUtil.waitTillInstanceReachState(clusterOC, recipeMerlin.getName(), 1, + CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS); + + final NotifyingAssert anAssert = new NotifyingAssert(true); + HiveAssert.assertTableEqual(cluster, clusterHC.getTable(DB_NAME, tblName), + cluster2, clusterHC2.getTable(DB_NAME, tblName), anAssert, false); + anAssert.assertNotEquals(clusterHC2.getTable(DB_NAME, tblName).getTabletype(), + clusterHC.getTable(DB_NAME, tblName).getTableName(), + "Source and destination tables should have different Tabletype"); + anAssert.assertNotEquals(clusterHC2.getTable(DB_NAME, tblName).getTblProps().get("EXTERNAL"), + clusterHC.getTable(DB_NAME, tblName).getTblProps().get("EXTERNAL"), + "Source and destination tables should have different value of property EXTERNAL"); + anAssert.assertAll(); + } + + @Test + public void drExtPartitionedToNonExtPartitioned() throws Exception { + final String tblName = "extPartitionedToNonExtPartitioned"; + recipeMerlin.withSourceDb(DB_NAME).withSourceTable(tblName); + final List<String> command = recipeMerlin.getSubmissionCommand(); + + createExternalPartitionedTable(connection, clusterFS, + baseTestHDFSDir + "click_data/", tblName); + runSql(connection2, + "create table " + tblName + " (data string, time string) partitioned by (date_ string)"); + runSql(connection2, "alter table " + tblName + " add partition " + + "(date_='2001-01-01') location '" + baseTestHDFSDir + "click_data/2001-01-01/'"); + runSql(connection2, "alter table " + tblName + " add partition " + + "(date_='2001-01-02') location '" + baseTestHDFSDir + "click_data/2001-01-02/'"); + + runSql(connection2, "insert into table " + tblName + " partition (date_='2001-01-01') " + + "values ('click1', '01:01:01')"); + runSql(connection2, "insert into table " + tblName + " partition (date_='2001-01-02') " + + "values ('click2', '02:02:02')"); + + final NotifyingAssert anAssert = new NotifyingAssert(true); + HiveAssert.assertTableEqual(cluster, clusterHC.getTable(DB_NAME, tblName), + cluster2, clusterHC2.getTable(DB_NAME, tblName), anAssert, false); + + + Assert.assertEquals(Bundle.runFalconCLI(command), 0, "Recipe submission failed."); + + //change column name + runSql(connection, + "alter table " + tblName + " change column data data_new string"); + + InstanceUtil.waitTillInstanceReachState(clusterOC, recipeMerlin.getName(), 1, + CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS); + + HiveAssert.assertTableEqual(cluster, clusterHC.getTable(DB_NAME, tblName), + cluster2, clusterHC2.getTable(DB_NAME, tblName), anAssert, false); + anAssert.assertNotEquals(clusterHC2.getTable(DB_NAME, tblName).getTabletype(), + clusterHC.getTable(DB_NAME, tblName).getTableName(), + "Source and destination tables should have different Tabletype"); + anAssert.assertNotEquals(clusterHC2.getTable(DB_NAME, tblName).getTblProps().get("EXTERNAL"), + clusterHC.getTable(DB_NAME, tblName).getTblProps().get("EXTERNAL"), + "Source and destination tables should have different value of property EXTERNAL"); + anAssert.assertAll(); + } + + /** + * 1 src tbl 1 dst tbl. Change table properties and comment at the source. + * Changes should get reflected at destination. + */ + @Test + public void drChangeCommentAndPropertyTest() throws Exception { + final String tblName = "myTable"; + recipeMerlin.withSourceDb(DB_NAME).withSourceTable(tblName); + final List<String> command = recipeMerlin.getSubmissionCommand(); + + runSql(connection, "create table " + tblName + "(field string)"); + //add new table property + runSql(connection, + "ALTER TABLE " + tblName + " SET TBLPROPERTIES('someProperty' = 'initialValue')"); + //set comment + runSql(connection, + "ALTER TABLE " + tblName + " SET TBLPROPERTIES('comment' = 'this comment will be " + + "changed, SHOULD NOT appear')"); + + LOGGER.info(tblName + " before bootstrap copy: "); + runSql(connection, "describe extended " + tblName); + + bootstrapCopy(connection, clusterFS, tblName, connection2, clusterFS2, tblName); + + //change table property and comment + runSql(connection, + "ALTER TABLE " + tblName + " SET TBLPROPERTIES('someProperty' = 'anotherValue')"); + runSql(connection, + "ALTER TABLE " + tblName + " SET TBLPROPERTIES('comment' = 'this comment should " + + "appear after replication done')"); + + LOGGER.info(tblName + " after modifications, before replication: "); + runSql(connection, "describe extended " + tblName); + + Assert.assertEquals(Bundle.runFalconCLI(command), 0, "Recipe submission failed."); + + InstanceUtil.waitTillInstanceReachState(clusterOC, recipeMerlin.getName(), 1, + CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS); + + HiveAssert.assertTableEqual(cluster, clusterHC.getTable(DB_NAME, tblName), + cluster2, clusterHC2.getTable(DB_NAME, tblName), new NotifyingAssert(true) + ).assertAll(); + } + + @Test + public void dataGeneration() throws Exception { + runSql(connection, "use hdr_sdb1"); + createVanillaTable(connection, "store_sales"); + createSerDeTable(connection); + createPartitionedTable(connection); + createExternalTable(connection, clusterFS, + baseTestHDFSDir + "click_data/", "click_data"); + createExternalPartitionedTable(connection, clusterFS, + baseTestHDFSDir + "click_data2/", "click_data2"); + + runSql(connection2, "use hdr_sdb1"); + createVanillaTable(connection2, "store_sales"); + createSerDeTable(connection2); + createPartitionedTable(connection2); + createExternalTable(connection2, clusterFS2, + baseTestHDFSDir + "click_data/", "click_data"); + createExternalPartitionedTable(connection2, clusterFS2, + baseTestHDFSDir + "click_data2/", "click_data2"); + + final NotifyingAssert anAssert = new NotifyingAssert(true); + HiveAssert.assertDbEqual(cluster, clusterHC.getDatabase("hdr_sdb1"), + cluster2, clusterHC2.getDatabase("hdr_sdb1"), anAssert); + + HiveAssert.assertTableEqual(cluster, clusterHC.getTable("hdr_sdb1", "click_data"), + cluster2, clusterHC2.getTable("hdr_sdb1", "click_data"), anAssert); + anAssert.assertAll(); + + } + + @Test(enabled = false) + public void assertionTest() throws Exception { + final SoftAssert anAssert = new SoftAssert(); + HiveAssert.assertTableEqual( + cluster, clusterHC.getTable("default", "hcatsmoke10546"), + cluster2, clusterHC2.getTable("default", "hcatsmoke10548"), anAssert); + HiveAssert.assertDbEqual(cluster, clusterHC.getDatabase("default"), cluster2, + clusterHC2.getDatabase("default"), anAssert); + anAssert.assertAll(); + } + + /** + * Test creates a table on first cluster using static partitioning. Then it creates the same + * table on the second cluster using dynamic partitioning. Finally it checks the equality of + * these tables. + * @throws SQLException + * @throws IOException + */ + @Test + public void dynamicPartitionsTest() throws SQLException, IOException { + //create table with static partitions on first cluster + createPartitionedTable(connection, false); + + //create table with dynamic partitions on second cluster + createPartitionedTable(connection2, true); + + //check that both tables are equal + HiveAssert.assertTableEqual( + cluster, clusterHC.getTable("hdr_sdb1", "global_store_sales"), + cluster2, clusterHC2.getTable("hdr_sdb1", "global_store_sales"), new SoftAssert() + ).assertAll(); + } + + /** + * 1 src tbl 1 dst tbl replication. Insert/delete/replace partitions using dynamic partition + * queries. The changes should get reflected at destination. + */ + @Test + public void drInsertDropReplaceDynamicPartition() throws Exception { + final String tblName = "dynamicPartitionDR"; + recipeMerlin.withSourceDb(DB_NAME).withSourceTable(tblName); + final List<String> command = recipeMerlin.getSubmissionCommand(); + + //disable strict mode to use only dynamic partition + runSql(connection, "set hive.exec.dynamic.partition.mode=nonstrict"); + + runSql(connection, + "create table " + tblName + "(comment string) partitioned by (pname string)"); + runSql(connection, + "insert into table " + tblName + " partition (pname) values" + + "('this partition is going to be deleted - should NOT appear after dr', 'DELETE')"); + runSql(connection, + "insert into table " + tblName + " partition (pname) values" + + "('this partition is going to be replaced - should NOT appear after dr', 'REPLACE')"); + runSql(connection, + "insert into table " + tblName + " partition (pname) values" + + "('this partition will have more data - should appear after dr', 'ADD_DATA')"); + + LOGGER.info(tblName + " before bootstrap copying: "); + runSql(connection, "select * from " + tblName); + bootstrapCopy(connection, clusterFS, tblName, connection2, clusterFS2, tblName); + + runSql(connection, + "insert into table " + tblName + " partition (pname) values" + + "('this partition has been added post bootstrap - should appear after dr', 'NEW_PART')"); + runSql(connection, + "insert into table " + tblName + " partition (pname) values" + + "('more data has been added post bootstrap - should appear after dr', 'ADD_DATA')"); + runSql(connection, + "alter table " + tblName + " drop partition(pname = 'DELETE')"); + runSql(connection, + "alter table " + tblName + " drop partition(pname = 'REPLACE')"); + runSql(connection, + "insert into table " + tblName + " partition (pname) values" + + "('this partition has been replaced - should appear after dr', 'REPLACE')"); + + LOGGER.info(tblName + " after modifications, before replication: "); + runSql(connection, "select * from " + tblName); + + Assert.assertEquals(Bundle.runFalconCLI(command), 0, "Recipe submission failed."); + + InstanceUtil.waitTillInstanceReachState(clusterOC, recipeMerlin.getName(), 1, + CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS); + + HiveAssert.assertTableEqual(cluster, clusterHC.getTable(DB_NAME, tblName), + cluster2, clusterHC2.getTable(DB_NAME, tblName), new NotifyingAssert(true) + ).assertAll(); + } + + /** + * 1 src tbl 1 dst tbl replication. Insert/overwrite partitions using dynamic partitions + * queries. The changes should get reflected at destination. + * @throws Exception + */ + @Test + public void drInsertOverwriteDynamicPartition() throws Exception { + final String tblName = "drInsertOverwritePartition"; + final String hlpTblName = "drInsertOverwritePartitionHelperTbl"; + recipeMerlin.withSourceDb(DB_NAME).withSourceTable(tblName); + final List<String> command = recipeMerlin.getSubmissionCommand(); + + //disable strict mode to use only dynamic partition + runSql(connection, "set hive.exec.dynamic.partition.mode=nonstrict"); + + runSql(connection, + "create table " + hlpTblName + "(comment string) partitioned by (pname string)"); + runSql(connection, + "insert into table " + hlpTblName + " partition (pname)" + + " values('overwrite data - should appear after dr', 'OVERWRITE_PART')"); + runSql(connection, + "insert into table " + hlpTblName + " partition (pname)" + + " values('newdata row2 - should appear after dr', 'NEW_DATA')"); + runSql(connection, + "insert into table " + hlpTblName + " partition (pname)" + + " values('newdata row1 - should appear after dr', 'NEW_DATA')"); + + runSql(connection, + "create table " + tblName + "(comment string) partitioned by (pname string)"); + runSql(connection, + "insert into table " + tblName + " partition (pname) values" + + "('this data should be retained - should appear after dr', 'OLD_PART')"); + runSql(connection, + "insert into table " + tblName + " partition (pname) values" + + "('this data should get overwritten - should NOT appear after dr', 'OVERWRITE_PART')"); + + LOGGER.info(tblName + " before bootstrap copying: "); + runSql(connection, "select * from " + tblName); + bootstrapCopy(connection, clusterFS, tblName, connection2, clusterFS2, tblName); + + runSql(connection, + "insert overwrite table " + tblName + " partition (pname) " + + "select comment, pname from " + hlpTblName + " where comment REGEXP '^overwrite'"); + runSql(connection, + "insert overwrite table " + tblName + " partition (pname) " + + "select comment, pname from " + hlpTblName + " where comment REGEXP '^newdata'"); + + LOGGER.info(tblName + " after modifications, before replication: "); + runSql(connection, "select * from " + tblName); + + Assert.assertEquals(Bundle.runFalconCLI(command), 0, "Recipe submission failed."); + + InstanceUtil.waitTillInstanceReachState(clusterOC, recipeMerlin.getName(), 1, + CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS); + + HiveAssert.assertTableEqual(cluster, clusterHC.getTable(DB_NAME, tblName), + cluster2, clusterHC2.getTable(DB_NAME, tblName), new NotifyingAssert(true) + ).assertAll(); + } + + /** + * Run recipe with different frequencies. Submission should go through. + * Check frequency of the launched oozie job + */ + @Test(dataProvider = "frequencyGenerator") + public void differentRecipeFrequenciesTest(String frequency) throws Exception { + LOGGER.info("Testing with frequency: " + frequency); + String tblName = "myTable"; + recipeMerlin.withSourceDb(DB_NAME).withSourceTable(tblName) + .withFrequency(new Frequency(frequency)); + runSql(connection, "create table " + tblName + "(comment string)"); + final List<String> command = recipeMerlin.getSubmissionCommand(); + Assert.assertEquals(Bundle.runFalconCLI(command), 0, "Recipe submission failed."); + LOGGER.info("Submission went through."); + + InstanceUtil.waitTillInstanceReachState(clusterOC, recipeMerlin.getName(), 1, + CoordinatorAction.Status.RUNNING, EntityType.PROCESS); + String filter = "name=FALCON_PROCESS_" + recipeMerlin.getName(); + List<BundleJob> bundleJobs = OozieUtil.getBundles(clusterOC, filter, 0, 10); + List<String> bundleIds = OozieUtil.getBundleIds(bundleJobs); + String bundleId = OozieUtil.getMaxId(bundleIds); + List<CoordinatorJob> coords = clusterOC.getBundleJobInfo(bundleId).getCoordinators(); + List<String> cIds = new ArrayList<String>(); + for (CoordinatorJob coord : coords) { + cIds.add(coord.getId()); + } + String coordId = OozieUtil.getMinId(cIds); + CoordinatorJob job = clusterOC.getCoordJobInfo(coordId); + CoordinatorJob.Timeunit timeUnit = job.getTimeUnit(); + String freq = job.getFrequency(); + LOGGER.info("Frequency of running job: " + timeUnit + " " + freq); + Assert.assertTrue(frequency.contains(timeUnit.name().toLowerCase().replace("_", "")) + && frequency.contains(freq), "Running job has different frequency."); + } + + @DataProvider(name = "frequencyGenerator") + public Object[][] frequencyGenerator() { + return new Object[][]{{"minutes(10)"}, {"minutes(10000)"}, + {"days(3)"}, {"days(3000)"}, {"months(1)"}, {"months(1000)"}, }; + } + + @AfterMethod(alwaysRun = true) + public void tearDown() throws IOException { + try { + prism.getProcessHelper().deleteByName(recipeMerlin.getName(), null); + } catch (Exception e) { + LOGGER.info("Deletion of process: " + recipeMerlin.getName() + " failed with exception: " + e); + } + removeTestClassEntities(); + cleanTestsDirs(); + } + +}
http://git-wip-us.apache.org/repos/asf/falcon/blob/f9669000/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hive/dr/HiveDbDRTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hive/dr/HiveDbDRTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hive/dr/HiveDbDRTest.java new file mode 100644 index 0000000..a64bd6d --- /dev/null +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hive/dr/HiveDbDRTest.java @@ -0,0 +1,270 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.regression.hive.dr; + +import org.apache.falcon.cli.FalconCLI; +import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.entity.v0.Frequency; +import org.apache.falcon.regression.Entities.ClusterMerlin; +import org.apache.falcon.regression.Entities.RecipeMerlin; +import org.apache.falcon.regression.core.bundle.Bundle; +import org.apache.falcon.regression.core.enumsAndConstants.MerlinConstants; +import org.apache.falcon.regression.core.helpers.ColoHelper; +import org.apache.falcon.regression.core.supportClasses.NotifyingAssert; +import org.apache.falcon.regression.core.util.BundleUtil; +import org.apache.falcon.regression.core.util.Config; +import org.apache.falcon.regression.core.util.HadoopUtil; +import org.apache.falcon.regression.core.util.HiveAssert; +import org.apache.falcon.regression.core.util.InstanceUtil; +import org.apache.falcon.regression.core.util.TimeUtil; +import org.apache.falcon.regression.testHelper.BaseTestClass; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hive.hcatalog.api.HCatClient; +import org.apache.log4j.Logger; +import org.apache.oozie.client.CoordinatorAction; +import org.apache.oozie.client.OozieClient; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.SQLException; +import java.util.List; + +import static org.apache.falcon.regression.core.util.HiveUtil.runSql; +import static org.apache.falcon.regression.hive.dr.HiveObjectCreator.bootstrapCopy; +import static org.apache.falcon.regression.hive.dr.HiveObjectCreator.createVanillaTable; + +/** + * Hive DR Testing for Hive database replication. + */ +@Test(groups = "embedded") +public class HiveDbDRTest extends BaseTestClass { + private static final Logger LOGGER = Logger.getLogger(HiveDbDRTest.class); + private final ColoHelper cluster = servers.get(0); + private final ColoHelper cluster2 = servers.get(1); + private final FileSystem clusterFS = serverFS.get(0); + private final FileSystem clusterFS2 = serverFS.get(1); + private final OozieClient clusterOC = serverOC.get(0); + private HCatClient clusterHC; + private HCatClient clusterHC2; + private RecipeMerlin recipeMerlin; + private Connection connection; + private Connection connection2; + + @BeforeMethod(alwaysRun = true) + public void setUp() throws Exception { + clusterHC = cluster.getClusterHelper().getHCatClient(); + clusterHC2 = cluster2.getClusterHelper().getHCatClient(); + bundles[0] = new Bundle(BundleUtil.readHCatBundle(), cluster); + bundles[1] = new Bundle(BundleUtil.readHCatBundle(), cluster2); + bundles[0].generateUniqueBundle(this); + bundles[1].generateUniqueBundle(this); + final ClusterMerlin srcCluster = bundles[0].getClusterElement(); + final ClusterMerlin tgtCluster = bundles[1].getClusterElement(); + Bundle.submitCluster(bundles[0]); + + if (MerlinConstants.IS_SECURE) { + recipeMerlin = RecipeMerlin.readFromDir("HiveDrSecureRecipe", + FalconCLI.RecipeOperation.HIVE_DISASTER_RECOVERY) + .withRecipeCluster(srcCluster); + } else { + recipeMerlin = RecipeMerlin.readFromDir("HiveDrRecipe", + FalconCLI.RecipeOperation.HIVE_DISASTER_RECOVERY) + .withRecipeCluster(srcCluster); + } + recipeMerlin.withSourceCluster(srcCluster) + .withTargetCluster(tgtCluster) + .withFrequency(new Frequency("5", Frequency.TimeUnit.minutes)) + .withValidity(TimeUtil.getTimeWrtSystemTime(-1), TimeUtil.getTimeWrtSystemTime(11)); + recipeMerlin.setUniqueName(this.getClass().getSimpleName()); + + connection = cluster.getClusterHelper().getHiveJdbcConnection(); + + connection2 = cluster2.getClusterHelper().getHiveJdbcConnection(); + } + + private void setUpDb(String dbName, Connection conn) throws SQLException { + runSql(conn, "drop database if exists " + dbName + " cascade"); + runSql(conn, "create database " + dbName); + runSql(conn, "use " + dbName); + } + + @Test + public void drDbDropDb() throws Exception { + final String dbName = "drDbDropDb"; + setUpDb(dbName, connection); + setUpDb(dbName, connection2); + recipeMerlin.withSourceDb(dbName).withSourceTable("*"); + final List<String> command = recipeMerlin.getSubmissionCommand(); + + Assert.assertEquals(Bundle.runFalconCLI(command), 0, "Recipe submission failed."); + + runSql(connection, "drop database " + dbName); + + InstanceUtil.waitTillInstanceReachState(clusterOC, recipeMerlin.getName(), 1, + CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS); + + final List<String> dstDbs = runSql(connection2, "show databases"); + Assert.assertFalse(dstDbs.contains(dbName), "dstDbs = " + dstDbs + " was not expected to " + + "contain " + dbName); + } + + + @Test(dataProvider = "isDBReplication") + public void drDbFailPass(Boolean isDBReplication) throws Exception { + final String dbName = "drDbFailPass"; + final String tblName = "vanillaTable"; + final String hiveWarehouseLocation = Config.getProperty("hive.warehouse.location", "/apps/hive/warehouse/"); + final String dbPath = HadoopUtil.joinPath(hiveWarehouseLocation, dbName.toLowerCase() + ".db"); + setUpDb(dbName, connection); + runSql(connection, "create table " + tblName + "(data string)"); + setUpDb(dbName, connection2); + bootstrapCopy(connection, clusterFS, tblName, connection2, clusterFS2, tblName); + + recipeMerlin.withSourceDb(dbName).withSourceTable(isDBReplication ? "*" : tblName); + + final List<String> command = recipeMerlin.getSubmissionCommand(); + Assert.assertEquals(Bundle.runFalconCLI(command), 0, "Recipe submission failed."); + + runSql(connection, "insert into table " + tblName + " values('cannot be replicated now')"); + final String noReadWritePerm = "d---r-xr-x"; + LOGGER.info("Setting " + clusterFS2.getUri() + dbPath + " to : " + noReadWritePerm); + clusterFS2.setPermission(new Path(dbPath), FsPermission.valueOf(noReadWritePerm)); + + InstanceUtil.waitTillInstanceReachState(clusterOC, recipeMerlin.getName(), 1, + CoordinatorAction.Status.KILLED, EntityType.PROCESS); + + final String readWritePerm = "drwxr-xr-x"; + LOGGER.info("Setting " + clusterFS2.getUri() + dbPath + " to : " + readWritePerm); + clusterFS2.setPermission(new Path(dbPath), FsPermission.valueOf(readWritePerm)); + + InstanceUtil.waitTillInstanceReachState(clusterOC, recipeMerlin.getName(), 1, + CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS); + + HiveAssert.assertTableEqual(cluster, clusterHC.getTable(dbName, tblName), + cluster2, clusterHC2.getTable(dbName, tblName), new NotifyingAssert(true) + ).assertAll(); + } + + @Test + public void drDbAddDropTable() throws Exception { + final String dbName = "drDbAddDropTable"; + final String tblToBeDropped = "table_to_be_dropped"; + final String tblToBeDroppedAndAdded = "table_to_be_dropped_and_readded"; + final String newTableToBeAdded = "new_table_to_be_added"; + + setUpDb(dbName, connection); + setUpDb(dbName, connection2); + recipeMerlin.withSourceDb(dbName).withSourceTable("*") + .withFrequency(new Frequency("2", Frequency.TimeUnit.minutes)); + final List<String> command = recipeMerlin.getSubmissionCommand(); + + createVanillaTable(connection, tblToBeDropped); + createVanillaTable(connection, tblToBeDroppedAndAdded); + bootstrapCopy(connection, clusterFS, tblToBeDropped, + connection2, clusterFS2, tblToBeDropped); + bootstrapCopy(connection, clusterFS, tblToBeDroppedAndAdded, + connection2, clusterFS2, tblToBeDroppedAndAdded); + + /* For first replication - two tables are dropped & one table is added */ + runSql(connection, "drop table " + tblToBeDropped); + runSql(connection, "drop table " + tblToBeDroppedAndAdded); + createVanillaTable(connection, newTableToBeAdded); + + Assert.assertEquals(Bundle.runFalconCLI(command), 0, "Recipe submission failed."); + + InstanceUtil.waitTillInstanceReachState(clusterOC, recipeMerlin.getName(), 1, + CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS); + + final NotifyingAssert anAssert = new NotifyingAssert(true); + HiveAssert.assertDbEqual(cluster, clusterHC.getDatabase(dbName), + cluster2, clusterHC2.getDatabase(dbName), anAssert); + + /* For second replication - a dropped tables is added back */ + createVanillaTable(connection, tblToBeDroppedAndAdded); + + InstanceUtil.waitTillInstanceReachState(clusterOC, recipeMerlin.getName(), 2, + CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS); + + HiveAssert.assertDbEqual(cluster, clusterHC.getDatabase(dbName), + cluster2, clusterHC2.getDatabase(dbName), anAssert); + anAssert.assertAll(); + } + + @Test(enabled = false) + public void drDbNonReplicatableTable() throws Exception { + final String dbName = "drDbNonReplicatableTable"; + final String tblName = "vanillaTable"; + final String tblView = "vanillaTableView"; + final String tblOffline = "offlineTable"; + + setUpDb(dbName, connection); + setUpDb(dbName, connection2); + recipeMerlin.withSourceDb(dbName).withSourceTable("*") + .withFrequency(new Frequency("2", Frequency.TimeUnit.minutes)); + final List<String> command = recipeMerlin.getSubmissionCommand(); + + createVanillaTable(connection, tblName); + runSql(connection, "create view " + tblView + " as select * from " + tblName); + createVanillaTable(connection, tblOffline); + bootstrapCopy(connection, clusterFS, tblName, connection2, clusterFS2, tblName); + bootstrapCopy(connection, clusterFS, tblOffline, connection2, clusterFS2, tblOffline); + final String newComment = "'new comment for offline table should not reach destination'"; + runSql(connection, + "alter table " + tblOffline + " set tblproperties ('comment' =" + newComment +")"); + runSql(connection, "alter table " + tblOffline + " enable offline"); + Assert.assertEquals(Bundle.runFalconCLI(command), 0, "Recipe submission failed."); + + InstanceUtil.waitTillInstanceReachState(clusterOC, recipeMerlin.getName(), 1, + CoordinatorAction.Status.SUCCEEDED, EntityType.PROCESS); + + //vanilla table gets replicated, offline table & view are not replicated + HiveAssert.assertTableEqual(cluster, clusterHC.getTable(dbName, tblName), + cluster2, clusterHC2.getTable(dbName, tblName), new NotifyingAssert(true)).assertAll(); + final List<String> dstTables = runSql(connection2, "show tables"); + Assert.assertFalse(dstTables.contains(tblView), + "dstTables = " + dstTables + " was not expected to contain " + tblView); + final List<String> dstComment = + runSql(connection2, "show tblproperties " + tblOffline + "('comment')"); + Assert.assertFalse(dstComment.contains(newComment), + tblOffline + " comment = " + dstComment + " was not expected to contain " + newComment); + } + + @AfterMethod(alwaysRun = true) + public void tearDown() throws IOException { + try { + prism.getProcessHelper().deleteByName(recipeMerlin.getName(), null); + } catch (Exception e) { + LOGGER.info("Deletion of process: " + recipeMerlin.getName() + " failed with exception: " + e); + } + removeTestClassEntities(); + cleanTestsDirs(); + } + + @DataProvider + public Object[][] isDBReplication() { + return new Object[][]{{Boolean.TRUE}, {Boolean.FALSE}}; + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/f9669000/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hive/dr/HiveObjectCreator.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hive/dr/HiveObjectCreator.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hive/dr/HiveObjectCreator.java new file mode 100644 index 0000000..9eb389a --- /dev/null +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/hive/dr/HiveObjectCreator.java @@ -0,0 +1,208 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.regression.hive.dr; + +import org.apache.falcon.regression.core.util.HadoopUtil; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.SQLException; + +import static org.apache.falcon.regression.core.util.HadoopUtil.writeDataForHive; +import static org.apache.falcon.regression.core.util.HiveUtil.runSql; + +/** + * Create Hive tables for testing Hive DR. Note that this is not expected to be used out of + * HiveDR tests. + */ +final class HiveObjectCreator { + private static final Logger LOGGER = Logger.getLogger(HiveObjectCreator.class); + private static final String HDFS_TMP_DIR = "/tmp/hive_objects/"; + + private HiveObjectCreator() { + throw new AssertionError("Instantiating utility class..."); + } + + static void bootstrapCopy(Connection srcConnection, FileSystem srcFs, String srcTable, + Connection dstConnection, FileSystem dstFs, String dstTable) throws Exception { + LOGGER.info("Starting bootstrap..."); + final String dumpPath = HDFS_TMP_DIR + srcTable + "/"; + HadoopUtil.recreateDir(srcFs, dumpPath); + runSqlQuietly(srcConnection, "dfs -chmod -R 777 " + dumpPath); + HadoopUtil.deleteDirIfExists(dumpPath, dstFs); + runSql(srcConnection, "export table " + srcTable + " to '" + dumpPath + "' FOR REPLICATION('ignore')"); + FileUtil.copy(srcFs, new Path(dumpPath), dstFs, new Path(dumpPath), false, true, new Configuration()); + runSqlQuietly(dstConnection, "dfs -chmod -R 777 " + dumpPath); + runSql(dstConnection, "import table " + dstTable + " from '" + dumpPath + "'"); + HadoopUtil.deleteDirIfExists(dumpPath, srcFs); + HadoopUtil.deleteDirIfExists(dumpPath, dstFs); + LOGGER.info("Finished bootstrap"); + } + + /* We need to delete it using hive query as the created directory is owned by hive.*/ + private static void runSqlQuietly(Connection srcConnection, String sql) { + try { + runSql(srcConnection, sql); + } catch (SQLException ex) { + LOGGER.info("Exception while hive ql execution: " + ex.getMessage()); + } + } + + /** + * Create an external table. + * @param connection jdbc connection object to use for issuing queries to hive + * @param fs filesystem object to upload the data + * @param clickDataLocation location to upload the data to + * @throws IOException + * @throws SQLException + */ + static void createExternalTable(Connection connection, FileSystem fs, String + clickDataLocation, String tableName) throws IOException, SQLException { + HadoopUtil.deleteDirIfExists(clickDataLocation, fs); + fs.mkdirs(new Path(clickDataLocation)); + fs.setPermission(new Path(clickDataLocation), FsPermission.getDirDefault()); + writeDataForHive(fs, clickDataLocation, + new StringBuffer("click1").append((char) 0x01).append("01:01:01").append("\n") + .append("click2").append((char) 0x01).append("02:02:02"), true); + //clusterFS.setPermission(new Path(clickDataPart2), FsPermission.getFileDefault()); + runSql(connection, "create external table " + tableName + + " (data string, time string) " + + "location '" + clickDataLocation + "'"); + runSql(connection, "select * from " + tableName); + } + + + /** + * Create an external table. + * @param connection jdbc connection object to use for issuing queries to hive + * @param fs filesystem object to upload the data + * @param clickDataLocation location to upload the data to + * @throws IOException + * @throws SQLException + */ + static void createExternalPartitionedTable(Connection connection, FileSystem fs, String + clickDataLocation, String tableName) throws IOException, SQLException { + final String clickDataPart1 = clickDataLocation + "2001-01-01/"; + final String clickDataPart2 = clickDataLocation + "2001-01-02/"; + fs.mkdirs(new Path(clickDataLocation)); + fs.setPermission(new Path(clickDataLocation), FsPermission.getDirDefault()); + writeDataForHive(fs, clickDataPart1, + new StringBuffer("click1").append((char) 0x01).append("01:01:01"), true); + writeDataForHive(fs, clickDataPart2, + new StringBuffer("click2").append((char) 0x01).append("02:02:02"), true); + //clusterFS.setPermission(new Path(clickDataPart2), FsPermission.getFileDefault()); + runSql(connection, "create external table " + tableName + + " (data string, time string) partitioned by (date_ string) " + + "location '" + clickDataLocation + "'"); + runSql(connection, "alter table " + tableName + " add partition " + + "(date_='2001-01-01') location '" + clickDataPart1 + "'"); + runSql(connection, "alter table " + tableName + " add partition " + + "(date_='2001-01-02') location '" + clickDataPart2 + "'"); + runSql(connection, "select * from " + tableName); + } + + /** + * Create an partitioned table. + * @param connection jdbc connection object to use for issuing queries to hive + * @throws SQLException + */ + static void createPartitionedTable(Connection connection) throws SQLException { + runSql(connection, "create table global_store_sales " + + "(customer_id string, item_id string, quantity float, price float, time timestamp) " + + "partitioned by (country string)"); + runSql(connection, + "insert into table global_store_sales partition (country = 'us') values" + + "('c1', 'i1', '1', '1', '2001-01-01 01:01:01')"); + runSql(connection, + "insert into table global_store_sales partition (country = 'uk') values" + + "('c2', 'i2', '2', '2', '2001-01-01 01:01:02')"); + runSql(connection, "select * from global_store_sales"); + } + + /** + * Create an plain old table. + * @param connection jdbc connection object to use for issuing queries to hive + * @param tblName + * @throws SQLException + */ + static void createVanillaTable(Connection connection, String tblName) throws SQLException { + //vanilla table + runSql(connection, "create table " + tblName + + "(customer_id string, item_id string, quantity float, price float, time timestamp)"); + runSql(connection, "insert into table " + tblName + " values " + + "('c1', 'i1', '1', '1', '2001-01-01 01:01:01'), " + + "('c2', 'i2', '2', '2', '2001-01-01 01:01:02')"); + runSql(connection, "select * from " + tblName); + } + + /** + * Create a partitioned table with either dynamic or static partitions. + * @param connection jdbc connection object to use for issuing queries to hive + * @param dynamic should partitions be added in dynamic or static way + * @throws SQLException + */ + static void createPartitionedTable(Connection connection, + boolean dynamic) throws SQLException { + String [][] partitions = { + {"us", "Kansas", }, + {"us", "California", }, + {"au", "Queensland", }, + {"au", "Victoria", }, + }; + //create table + runSql(connection, "drop table global_store_sales"); + runSql(connection, "create table global_store_sales(customer_id string," + + " item_id string, quantity float, price float, time timestamp) " + + "partitioned by (country string, state string)"); + //provide data + String query; + if (dynamic) { + //disable strict mode, thus both partitions can be used as dynamic + runSql(connection, "set hive.exec.dynamic.partition.mode=nonstrict"); + query = "insert into table global_store_sales partition" + + "(country, state) values('c%3$s', 'i%3$s', '%3$s', '%3$s', " + + "'2001-01-01 01:01:0%3$s', '%1$s', '%2$s')"; + } else { + query = "insert into table global_store_sales partition" + + "(country = '%1$s', state = '%2$s') values('c%3$s', 'i%3$s', '%3$s', '%3$s', " + + "'2001-01-01 01:01:0%3$s')"; + } + for (int i = 0; i < partitions.length; i++) { + runSql(connection, String.format(query, partitions[i][0], partitions[i][1], i + 1)); + } + runSql(connection, "select * from global_store_sales"); + } + + static void createSerDeTable(Connection connection) throws SQLException { + runSql(connection, "create table store_json " + + "(customer_id string, item_id string, quantity float, price float, time timestamp) " + + "row format serde 'org.apache.hive.hcatalog.data.JsonSerDe' "); + runSql(connection, "insert into table store_json values " + + "('c1', 'i1', '1', '1', '2001-01-01 01:01:01'), " + + "('c2', 'i2', '2', '2', '2001-01-01 01:01:02')"); + runSql(connection, "select * from store_json"); + } + +} http://git-wip-us.apache.org/repos/asf/falcon/blob/f9669000/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ListFeedInstancesTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ListFeedInstancesTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ListFeedInstancesTest.java index 8ef6bb6..7ad4c8e 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ListFeedInstancesTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ListFeedInstancesTest.java @@ -88,7 +88,7 @@ public class ListFeedInstancesTest extends BaseTestClass { } /* - * Prepares running feed with instances ordered (desc): 1 waiting, 1 suspended, 1 running, + * Prepares running feed with instances ordered (desc): 1 waiting, 1 running, 1 suspended, * 3 waiting and 6 killed. Testing is based on expected instances statuses. */ private void prepareScenario() throws AuthenticationException, IOException, URISyntaxException, @@ -284,9 +284,9 @@ public class ListFeedInstancesTest extends BaseTestClass { "start=" + TimeUtil.addMinsToTime(endTime, -5) + "&end=" + endTime, null); InstanceUtil.validateResponse(r, 1, 0, 0, 1, 0); - //only start, actual feed startTime, should get 10 most recent instances(by default). + //only start, actual feed startTime, should get 1-10 instances(end is automatically set to freq*10). r = prism.getFeedHelper().listInstances(feedName, "start=" + startTime, null); - InstanceUtil.validateResponse(r, 10, 1, 1, 4, 4); + InstanceUtil.validateResponse(r, 10, 0, 1, 3, 6); //only start, greater then the actual startTime. r = prism.getFeedHelper().listInstances(feedName, http://git-wip-us.apache.org/repos/asf/falcon/blob/f9669000/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ListProcessInstancesTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ListProcessInstancesTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ListProcessInstancesTest.java index f35e12d..be8a631 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ListProcessInstancesTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/lineage/ListProcessInstancesTest.java @@ -19,7 +19,6 @@ package org.apache.falcon.regression.lineage; import org.apache.falcon.entity.v0.EntityType; -import org.apache.falcon.entity.v0.Frequency; import org.apache.falcon.regression.core.bundle.Bundle; import org.apache.falcon.regression.core.helpers.ColoHelper; import org.apache.falcon.regression.core.util.BundleUtil; @@ -75,15 +74,12 @@ public class ListProcessInstancesTest extends BaseTestClass { bundles[0].setInputFeedDataPath(feedDataLocation); bundles[0].setOutputFeedLocationData(baseTestHDFSDir + "/output" + MINUTE_DATE_PATTERN); bundles[0].setProcessValidity(startTime, endTime); - bundles[0].setInputFeedPeriodicity(5, Frequency.TimeUnit.minutes); bundles[0].setProcessConcurrency(3); bundles[0].submitAndScheduleProcess(); processName = bundles[0].getProcessName(); InstanceUtil.waitTillInstancesAreCreated(clusterOC, bundles[0].getProcessData(), 0); //create data for processes to run and wait some time for instances to make progress - OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0, 0); - OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0, 1); - OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0, 2); + OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0); InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 3, CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 3); } @@ -94,8 +90,7 @@ public class ListProcessInstancesTest extends BaseTestClass { } /** - * List process instances using orderBy - status, -startTime, -endTime params, expecting list of - * process instances in the right order. + * List process instances using orderBy - status, -startTime, -endTime params. */ @Test public void testProcessOrderBy() throws Exception { @@ -172,12 +167,12 @@ public class ListProcessInstancesTest extends BaseTestClass { //use start option without numResults. 10 instances expected r = prism.getProcessHelper().listInstances(processName, "start=" + startTime, null); - InstanceUtil.validateResponse(r, 10, 3, 0, 7, 0); + InstanceUtil.validateResponse(r, 10, 1, 0, 9, 0); //use start option with numResults value which is smaller then default. r = prism.getProcessHelper().listInstances(processName, "start=" + startTime + "&numResults=8", null); - InstanceUtil.validateResponse(r, 8, 3, 0, 5, 0); + InstanceUtil.validateResponse(r, 8, 0, 0, 8, 0); //use start option with numResults value greater then default. All 12 instances expected r = prism.getProcessHelper().listInstances(processName, @@ -242,8 +237,6 @@ public class ListProcessInstancesTest extends BaseTestClass { InstanceUtil.validateResponse(r, 1, 0, 0, 0, 1); //wait till new instances be RUNNING and total status count be stable - OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0, 3); - OozieUtil.createMissingDependencies(cluster, EntityType.PROCESS, processName, 0, 4); InstanceUtil.waitTillInstanceReachState(clusterOC, processName, 3, CoordinatorAction.Status.RUNNING, EntityType.PROCESS, 3); @@ -300,7 +293,7 @@ public class ListProcessInstancesTest extends BaseTestClass { + "&end=" + TimeUtil.addMinsToTime(startTime, 16), null); InstanceUtil.validateResponse(r, 1, 0, 0, 1, 0); - //only start, actual startTime, should get 10 most recent instances + //only start, actual startTime (end is automatically set to start + frequency * 10) r = prism.getProcessHelper().listInstances(processName, "start=" + startTime, null); InstanceUtil.validateResponse(r, 10, 3, 0, 7, 0); http://git-wip-us.apache.org/repos/asf/falcon/blob/f9669000/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/NewPrismProcessUpdateTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/NewPrismProcessUpdateTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/NewPrismProcessUpdateTest.java index 4946b30..10ab192 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/NewPrismProcessUpdateTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/NewPrismProcessUpdateTest.java @@ -87,7 +87,7 @@ public class NewPrismProcessUpdateTest extends BaseTestClass { @BeforeMethod(alwaysRun = true) public void testSetup() throws Exception { - Bundle b = BundleUtil.readUpdateBundle(); + final Bundle b = BundleUtil.readUpdateBundle(); bundles[0] = new Bundle(b, cluster1); bundles[0].generateUniqueBundle(this); bundles[1] = new Bundle(b, cluster2); http://git-wip-us.apache.org/repos/asf/falcon/blob/f9669000/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedDeleteTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedDeleteTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedDeleteTest.java index b7e6861..e8b7ed8 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedDeleteTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismFeedDeleteTest.java @@ -68,7 +68,7 @@ public class PrismFeedDeleteTest extends BaseTestClass { @BeforeMethod(alwaysRun = true) public void setUp() throws Exception { restartRequired = false; - Bundle bundle = BundleUtil.readELBundle(); + final Bundle bundle = BundleUtil.readELBundle(); bundles[0] = new Bundle(bundle, cluster1); bundles[0].generateUniqueBundle(this); bundles[0].setProcessWorkflow(aggregateWorkflowDir); @@ -361,8 +361,10 @@ public class PrismFeedDeleteTest extends BaseTestClass { @Test(groups = {"multiCluster"}) public void testServer1FeedDeleteNonExistentWhen1ColoIsDownDuringDelete() throws Exception { restartRequired = true; - bundles[0] = new Bundle(bundles[0], cluster1); - bundles[1] = new Bundle(bundles[1], cluster2); + bundles[0] = new Bundle(BundleUtil.readELBundle(), cluster1); + bundles[1] = new Bundle(BundleUtil.readELBundle(), cluster2); + bundles[0].generateUniqueBundle(this); + bundles[1].generateUniqueBundle(this); bundles[0].setCLusterColo(cluster1Colo); LOGGER.info("cluster bundle1: " + Util.prettyPrintXml(bundles[0].getClusters().get(0))); http://git-wip-us.apache.org/repos/asf/falcon/blob/f9669000/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismSubmitTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismSubmitTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismSubmitTest.java index f90a76b..23878df 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismSubmitTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/PrismSubmitTest.java @@ -59,8 +59,7 @@ public class PrismSubmitTest extends BaseTestClass { @BeforeMethod(alwaysRun = true) public void setUp() throws Exception { restartRequired = false; - bundles[0] = BundleUtil.readELBundle(); - bundles[0] = new Bundle(bundles[0], cluster1); + bundles[0] = new Bundle(BundleUtil.readELBundle(), cluster1); bundles[0].generateUniqueBundle(this); bundles[0].setProcessWorkflow(aggregateWorkflowDir); } @@ -308,7 +307,7 @@ public class PrismSubmitTest extends BaseTestClass { @Test(groups = {"prism", "0.2", "distributed"}) public void submitClusterReSubmitAlreadyPartial() throws Exception { restartRequired = true; - bundles[1] = new Bundle(bundles[0], cluster2); + bundles[1] = new Bundle(BundleUtil.readELBundle(), cluster2); bundles[1].generateUniqueBundle(this); bundles[1].setProcessWorkflow(aggregateWorkflowDir); http://git-wip-us.apache.org/repos/asf/falcon/blob/f9669000/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/UpdateAtSpecificTimeTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/UpdateAtSpecificTimeTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/UpdateAtSpecificTimeTest.java index e10f8d1..afa01c1 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/UpdateAtSpecificTimeTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/prism/UpdateAtSpecificTimeTest.java @@ -83,7 +83,7 @@ public class UpdateAtSpecificTimeTest extends BaseTestClass { @BeforeMethod(alwaysRun = true) public void setup() throws IOException { - Bundle bundle = BundleUtil.readFeedReplicationBundle(); + final Bundle bundle = BundleUtil.readFeedReplicationBundle(); bundles[0] = new Bundle(bundle, cluster1); bundles[1] = new Bundle(bundle, cluster2); bundles[2] = new Bundle(bundle, cluster3); @@ -92,8 +92,7 @@ public class UpdateAtSpecificTimeTest extends BaseTestClass { bundles[1].generateUniqueBundle(this); bundles[2].generateUniqueBundle(this); - processBundle = BundleUtil.readELBundle(); - processBundle = new Bundle(processBundle, cluster1); + processBundle = new Bundle(BundleUtil.readELBundle(), cluster1); processBundle.generateUniqueBundle(this); processBundle.setProcessWorkflow(aggregateWorkflowDir); } http://git-wip-us.apache.org/repos/asf/falcon/blob/f9669000/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/EntitiesTableTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/EntitiesTableTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/EntitiesTableTest.java index 63da183..c53e06b 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/EntitiesTableTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/EntitiesTableTest.java @@ -158,6 +158,9 @@ public class EntitiesTableTest extends BaseUITestClass { @DataProvider public Object[][] getBoolean() { - return new Boolean[][]{{Boolean.TRUE}, {Boolean.FALSE}}; + return new Boolean[][]{ + {Boolean.TRUE}, + {Boolean.FALSE}, + }; } } http://git-wip-us.apache.org/repos/asf/falcon/blob/f9669000/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/EntityPageTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/EntityPageTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/EntityPageTest.java index 4c1e379..4ad775e 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/EntityPageTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/EntityPageTest.java @@ -588,8 +588,8 @@ public class EntityPageTest extends BaseUITestClass { .getProcessInstanceLogs(process.getName(), "start=" + nominalTimeOfSelectedInstance + "&end=" + TimeUtil.addMinsToTime(nominalTimeOfSelectedInstance, 1)); - Assert.assertEquals(getDriver().getCurrentUrl().replaceFirst("/\\?", "?"), - processInstanceLogs.getInstances()[0].getLogFile(), + Assert.assertEquals(getDriver().getCurrentUrl().replaceFirst("/\\?", "?").toLowerCase(), + processInstanceLogs.getInstances()[0].getLogFile().toLowerCase(), "Only one instance is selected. " + "Clicking instance log button should take user to oozie page."); } http://git-wip-us.apache.org/repos/asf/falcon/blob/f9669000/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/FeedSetupTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/FeedSetupTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/FeedSetupTest.java index d8aed28..47b1d19 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/FeedSetupTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/FeedSetupTest.java @@ -139,6 +139,7 @@ public class FeedSetupTest extends BaseUITestClass{ @Test public void testWizardCancel() throws Exception { // Step 1 - Check cancel on the first page - General Info Page + feedWizardPage.setFeedGeneralInfo(feed); feedWizardPage.clickCancel(); searchPage.checkPage(); @@ -146,6 +147,7 @@ public class FeedSetupTest extends BaseUITestClass{ feedWizardPage = searchPage.getPageHeader().doCreateFeed(); feedWizardPage.setFeedGeneralInfo(feed); feedWizardPage.clickNext(); + feedWizardPage.setFeedPropertiesInfo(feed); feedWizardPage.clickCancel(); searchPage.checkPage(); @@ -155,6 +157,7 @@ public class FeedSetupTest extends BaseUITestClass{ feedWizardPage.clickNext(); feedWizardPage.setFeedPropertiesInfo(feed); feedWizardPage.clickNext(); + feedWizardPage.setFeedLocationInfo(feed); feedWizardPage.clickCancel(); searchPage.checkPage(); @@ -166,6 +169,7 @@ public class FeedSetupTest extends BaseUITestClass{ feedWizardPage.clickNext(); feedWizardPage.setFeedLocationInfo(feed); feedWizardPage.clickNext(); + feedWizardPage.setFeedClustersInfo(feed); feedWizardPage.clickCancel(); searchPage.checkPage(); http://git-wip-us.apache.org/repos/asf/falcon/blob/f9669000/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/HomePageTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/HomePageTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/HomePageTest.java index f71739d..20864f6 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/HomePageTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/HomePageTest.java @@ -111,18 +111,26 @@ public class HomePageTest extends BaseUITestClass { final String clusterXml = bundle.getClusterElement().toString(); homePage.getPageHeader().uploadXml(FileUtil.writeEntityToFile(clusterXml)); + String alert = homePage.getActiveAlertText(); + Assert.assertTrue(alert.contains("Submit successful"), "Not expected alert: '" + alert + "'"); AssertUtil.assertSucceeded(prism.getClusterHelper().getEntityDefinition(clusterXml)); final String feedXml = bundle.getInputFeedFromBundle(); homePage.getPageHeader().uploadXml(FileUtil.writeEntityToFile(feedXml)); + alert = homePage.getActiveAlertText(); + Assert.assertTrue(alert.contains("Submit successful"), "Not expected alert: '" + alert + "'"); AssertUtil.assertSucceeded(prism.getFeedHelper().getEntityDefinition(feedXml)); final String outputFeedXml = bundle.getOutputFeedFromBundle(); homePage.getPageHeader().uploadXml(FileUtil.writeEntityToFile(outputFeedXml)); + alert = homePage.getActiveAlertText(); + Assert.assertTrue(alert.contains("Submit successful"), "Not expected alert: '" + alert + "'"); AssertUtil.assertSucceeded(prism.getFeedHelper().getEntityDefinition(outputFeedXml)); final String processXml = bundle.getProcessObject().toString(); homePage.getPageHeader().uploadXml(FileUtil.writeEntityToFile(processXml)); + alert = homePage.getActiveAlertText(); + Assert.assertTrue(alert.contains("Submit successful"), "Not expected alert: '" + alert + "'"); AssertUtil.assertSucceeded(prism.getProcessHelper().getEntityDefinition(processXml)); } @@ -161,7 +169,6 @@ public class HomePageTest extends BaseUITestClass { writer.close(); homePage.getPageHeader().uploadXml(xmlFile.getAbsolutePath()); - Thread.sleep(1000); alertText = homePage.getActiveAlertText(); Assert.assertEquals(alertText, "Invalid xml. File not uploaded", "XML file with invalid text was allowed to be uploaded"); http://git-wip-us.apache.org/repos/asf/falcon/blob/f9669000/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/InstancePageTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/InstancePageTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/InstancePageTest.java index 9ec936d..8598b18 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/InstancePageTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/searchUI/InstancePageTest.java @@ -22,7 +22,11 @@ import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.Frequency; import org.apache.falcon.regression.core.bundle.Bundle; import org.apache.falcon.regression.core.helpers.ColoHelper; -import org.apache.falcon.regression.core.util.*; +import org.apache.falcon.regression.core.util.BundleUtil; +import org.apache.falcon.regression.core.util.HadoopUtil; +import org.apache.falcon.regression.core.util.InstanceUtil; +import org.apache.falcon.regression.core.util.OSUtil; +import org.apache.falcon.regression.core.util.OozieUtil; import org.apache.falcon.regression.testHelper.BaseUITestClass; import org.apache.falcon.regression.ui.search.EntityPage; import org.apache.falcon.regression.ui.search.InstancePage; @@ -33,9 +37,7 @@ import org.apache.falcon.resource.InstancesResult; import org.apache.oozie.client.CoordinatorAction; import org.apache.oozie.client.OozieClient; import org.testng.Assert; -import org.testng.annotations.AfterClass; import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeClass; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -59,14 +61,10 @@ public class InstancePageTest extends BaseUITestClass { private String instance = "2010-01-02T01:00Z"; private String processName; - @BeforeClass(alwaysRun = true) - public void setup() { - openBrowser(); - searchPage = LoginPage.open(getDriver()).doDefaultLogin(); - } - @BeforeMethod(alwaysRun = true) public void submitEntities() throws Exception { + openBrowser(); + searchPage = LoginPage.open(getDriver()).doDefaultLogin(); cleanAndGetTestDir(); HadoopUtil.uploadDir(serverFS.get(0), aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE); bundles[0] = BundleUtil.readELBundle(); @@ -184,11 +182,7 @@ public class InstancePageTest extends BaseUITestClass { @AfterMethod(alwaysRun = true) public void tearDown() throws IOException { - removeTestClassEntities(); - } - - @AfterClass(alwaysRun = true) - public void tearDownClass() { closeBrowser(); + removeTestClassEntities(); } }
