http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/addons/hivedr/src/main/resources/log4j.xml ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/main/resources/log4j.xml b/addons/hivedr/src/main/resources/log4j.xml new file mode 100644 index 0000000..f83a9a9 --- /dev/null +++ b/addons/hivedr/src/main/resources/log4j.xml @@ -0,0 +1,54 @@ +<?xml version="1.0" encoding="UTF-8" ?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + +<!-- + This is used for falcon packaging only. + --> + +<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd"> + +<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/"> + <appender name="console" class="org.apache.log4j.ConsoleAppender"> + <param name="Target" value="System.out"/> + <layout class="org.apache.log4j.PatternLayout"> + <param name="ConversionPattern" value="%d %-5p - [%t:%x] ~ %m (%c{1}:%L)%n"/> + </layout> + </appender> + + <logger name="org.apache.falcon" additivity="false"> + <level value="debug"/> + <appender-ref ref="console"/> + </logger> + + <logger name="org.apache.hadoop" additivity="false"> + <level value="info"/> + <appender-ref ref="console"/> + </logger> + + <logger name="org.apache.hadoop.hive" additivity="false"> + <level value="info"/> + <appender-ref ref="console"/> + </logger> + + <root> + <priority value="info"/> + <appender-ref ref="console"/> + </root> + +</log4j:configuration>
http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/addons/hivedr/src/test/java/org/apache/falcon/hive/DBReplicationStatusTest.java ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/test/java/org/apache/falcon/hive/DBReplicationStatusTest.java b/addons/hivedr/src/test/java/org/apache/falcon/hive/DBReplicationStatusTest.java new file mode 100644 index 0000000..bfeca8d --- /dev/null +++ b/addons/hivedr/src/test/java/org/apache/falcon/hive/DBReplicationStatusTest.java @@ -0,0 +1,230 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.hive; + +import org.apache.falcon.hive.exception.HiveReplicationException; +import org.apache.falcon.hive.util.DBReplicationStatus; +import org.apache.falcon.hive.util.ReplicationStatus; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.util.HashMap; +import java.util.Map; + +/** + * Unit tests for DBReplicationStatus. + */ +@Test +public class DBReplicationStatusTest { + + private Map<String, ReplicationStatus> tableStatuses = new HashMap<String, ReplicationStatus>(); + private ReplicationStatus dbReplicationStatus; + private ReplicationStatus tableStatus1; + + public DBReplicationStatusTest() { + } + + @BeforeClass + public void prepare() throws Exception { + dbReplicationStatus = new ReplicationStatus("source", "target", "jobname", + "Default1", null, ReplicationStatus.Status.FAILURE, 20L); + tableStatus1 = new ReplicationStatus("source", "target", "jobname", + "default1", "Table1", ReplicationStatus.Status.SUCCESS, 20L); + tableStatuses.put("Table1", tableStatus1); + + } + + public void dBReplicationStatusSerializeTest() throws Exception { + DBReplicationStatus replicationStatus = new DBReplicationStatus(dbReplicationStatus, tableStatuses); + + String expected = "{\n" + " \"db_status\": {\n" + + " \"sourceUri\": \"source\",\n" + " \"targetUri\": \"target\",\n" + + " \"jobName\": \"jobname\",\n" + " \"database\": \"default1\",\n" + + " \"status\": \"FAILURE\",\n" + " \"eventId\": 20\n" + " },\n" + + " \"table_status\": {\"table1\": {\n" + " \"sourceUri\": \"source\",\n" + + " \"targetUri\": \"target\",\n" + " \"jobName\": \"jobname\",\n" + + " \"database\": \"default1\",\n" + " \"table\": \"table1\",\n" + + " \"status\": \"SUCCESS\",\n" + " \"eventId\": 20\n" + " }}\n" + "}"; + String actual = replicationStatus.toJsonString(); + Assert.assertEquals(actual, expected); + } + + public void dBReplicationStatusDeserializeTest() throws Exception { + + String jsonString = "{\"db_status\":{\"sourceUri\":\"source\"," + + "\"targetUri\":\"target\",\"jobName\":\"jobname\",\"database\":\"default1\",\"status\":\"SUCCESS\"," + + "\"eventId\":20},\"table_status\":{\"Table1\":{\"sourceUri\":\"source\",\"targetUri\":\"target\"," + + "\"jobName\":\"jobname\",\"database\":\"default1\",\"table\":\"Table1\",\"status\":\"SUCCESS\"," + + "\"eventId\":20},\"table3\":{\"sourceUri\":\"source\",\"targetUri\":\"target\"," + + "\"jobName\":\"jobname\", \"database\":\"Default1\",\"table\":\"table3\",\"status\":\"FAILURE\"," + + "\"eventId\":10}, \"table2\":{\"sourceUri\":\"source\",\"targetUri\":\"target\"," + + "\"jobName\":\"jobname\", \"database\":\"default1\",\"table\":\"table2\",\"status\":\"INIT\"}}}"; + + DBReplicationStatus dbStatus = new DBReplicationStatus(jsonString); + Assert.assertEquals(dbStatus.getDatabaseStatus().getDatabase(), "default1"); + Assert.assertEquals(dbStatus.getDatabaseStatus().getJobName(), "jobname"); + Assert.assertEquals(dbStatus.getDatabaseStatus().getEventId(), 20); + + Assert.assertEquals(dbStatus.getTableStatuses().get("table1").getEventId(), 20); + Assert.assertEquals(dbStatus.getTableStatuses().get("table1").getStatus(), ReplicationStatus.Status.SUCCESS); + Assert.assertEquals(dbStatus.getTableStatuses().get("table2").getEventId(), -1); + Assert.assertEquals(dbStatus.getTableStatuses().get("table2").getStatus(), ReplicationStatus.Status.INIT); + Assert.assertEquals(dbStatus.getTableStatuses().get("table3").getEventId(), 10); + Assert.assertEquals(dbStatus.getTableStatuses().get("table3").getStatus(), ReplicationStatus.Status.FAILURE); + + + } + + public void wrongDBForTableTest() throws Exception { + + ReplicationStatus newDbStatus = new ReplicationStatus("source", "target", "jobname", + "wrongDb", null, ReplicationStatus.Status.FAILURE, 20L); + new DBReplicationStatus(newDbStatus); + + try { + new DBReplicationStatus(newDbStatus, tableStatuses); + Assert.fail(); + } catch (HiveReplicationException e) { + Assert.assertEquals(e.getMessage(), + "Cannot set status for table default1.table1, It does not belong to DB wrongdb"); + } + + String jsonString = "{\n" + " \"db_status\": {\n" + + " \"sourceUri\": \"source\",\n" + " \"targetUri\": \"target\",\n" + + " \"jobName\": \"jobname\",\n" + " \"database\": \"wrongdb\",\n" + + " \"status\": \"FAILURE\",\n" + " \"eventId\": 20\n" + " },\n" + + " \"table_status\": {\"table1\": {\n" + " \"sourceUri\": \"source\",\n" + + " \"targetUri\": \"target\",\n" + " \"jobName\": \"jobname\",\n" + + " \"database\": \"default1\",\n" + " \"table\": \"table1\",\n" + + " \"status\": \"SUCCESS\",\n" + " \"eventId\": 20\n" + " }}\n" + "}"; + + try { + new DBReplicationStatus(jsonString); + Assert.fail(); + } catch (HiveReplicationException e) { + Assert.assertEquals(e.getMessage(), + "Unable to create DBReplicationStatus from JsonString. Cannot set status for " + + "table default1.table1, It does not belong to DB wrongdb"); + } + } + + public void updateTableStatusTest() throws Exception { + DBReplicationStatus replicationStatus = new DBReplicationStatus(dbReplicationStatus, tableStatuses); + replicationStatus.updateTableStatus(tableStatus1); + + // wrong DB test + try { + replicationStatus.updateTableStatus(new ReplicationStatus("source", "target", "jobname", + "wrongDB", "table2", ReplicationStatus.Status.INIT, -1L)); + Assert.fail(); + } catch (HiveReplicationException e) { + Assert.assertEquals(e.getMessage(), + "Cannot update Table Status. TableDB wrongdb does not match current DB default1"); + } + + // wrong status test + try { + replicationStatus.updateTableStatus(dbReplicationStatus); + Assert.fail(); + } catch (HiveReplicationException e) { + Assert.assertEquals(e.getMessage(), + "Cannot update Table Status. Table name is empty."); + } + + } + + public void updateDBStatusTest() throws Exception { + DBReplicationStatus replicationStatus = new DBReplicationStatus(dbReplicationStatus, tableStatuses); + replicationStatus.updateDbStatus(dbReplicationStatus); + + // wrong DB test + try { + replicationStatus.updateDbStatus(new ReplicationStatus("source", "target", "jobname", + "wrongDB", null, ReplicationStatus.Status.INIT, -1L)); + Assert.fail(); + } catch (HiveReplicationException e) { + Assert.assertEquals(e.getMessage(), + "Cannot update Database Status. StatusDB wrongdb does not match current DB default1"); + } + + // wrong status test + try { + replicationStatus.updateDbStatus(tableStatus1); + Assert.fail(); + } catch (HiveReplicationException e) { + Assert.assertEquals(e.getMessage(), + "Cannot update DB Status. This is table level status."); + } + } + + public void updateDbStatusFromTableStatusesTest() throws Exception { + + ReplicationStatus dbStatus = new ReplicationStatus("source", "target", "jobname", + "default1", null, ReplicationStatus.Status.SUCCESS, 20L); + ReplicationStatus table1 = new ReplicationStatus("source", "target", "jobname", + "default1", "table1", ReplicationStatus.Status.SUCCESS, 20L); + ReplicationStatus table2 = new ReplicationStatus("source", "target", "jobname", + "Default1", "table2", ReplicationStatus.Status.INIT, -1L); + ReplicationStatus table3 = new ReplicationStatus("source", "target", "jobname", + "default1", "Table3", ReplicationStatus.Status.FAILURE, 15L); + ReplicationStatus table4 = new ReplicationStatus("source", "target", "jobname", + "Default1", "Table4", ReplicationStatus.Status.FAILURE, 18L); + Map<String, ReplicationStatus> tables = new HashMap<String, ReplicationStatus>(); + + tables.put("table1", table1); + tables.put("table2", table2); + tables.put("table3", table3); + tables.put("table4", table4); + + // If there is a failue, last eventId should be lowest eventId of failed tables + DBReplicationStatus status = new DBReplicationStatus(dbStatus, tables); + Assert.assertEquals(status.getDatabaseStatus().getEventId(), 20); + Assert.assertEquals(status.getDatabaseStatus().getStatus(), ReplicationStatus.Status.SUCCESS); + status.updateDbStatusFromTableStatuses(); + Assert.assertEquals(status.getDatabaseStatus().getEventId(), 15); + Assert.assertEquals(status.getDatabaseStatus().getStatus(), ReplicationStatus.Status.FAILURE); + + // If all tables succeed, last eventId should be highest eventId of success tables + table3 = new ReplicationStatus("source", "target", "jobname", + "default1", "table3", ReplicationStatus.Status.SUCCESS, 25L); + table4 = new ReplicationStatus("source", "target", "jobname", + "default1", "table4", ReplicationStatus.Status.SUCCESS, 22L); + tables.put("Table3", table3); + tables.put("Table4", table4); + status = new DBReplicationStatus(dbStatus, tables); + status.updateDbStatusFromTableStatuses(); + Assert.assertEquals(status.getDatabaseStatus().getEventId(), 25); + Assert.assertEquals(status.getDatabaseStatus().getStatus(), ReplicationStatus.Status.SUCCESS); + + // Init tables should not change DB status. + Map<String, ReplicationStatus> initOnlyTables = new HashMap<String, ReplicationStatus>(); + initOnlyTables.put("table2", table2); + dbStatus = new ReplicationStatus("source", "target", "jobname", + "default1", null, ReplicationStatus.Status.SUCCESS, 20L); + status = new DBReplicationStatus(dbStatus, initOnlyTables); + Assert.assertEquals(status.getDatabaseStatus().getEventId(), 20); + Assert.assertEquals(status.getDatabaseStatus().getStatus(), ReplicationStatus.Status.SUCCESS); + status.updateDbStatusFromTableStatuses(); + Assert.assertEquals(status.getDatabaseStatus().getEventId(), 20); + Assert.assertEquals(status.getDatabaseStatus().getStatus(), ReplicationStatus.Status.SUCCESS); + + + } + +} http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/addons/hivedr/src/test/java/org/apache/falcon/hive/DRTest.java ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/test/java/org/apache/falcon/hive/DRTest.java b/addons/hivedr/src/test/java/org/apache/falcon/hive/DRTest.java new file mode 100644 index 0000000..1f44b62 --- /dev/null +++ b/addons/hivedr/src/test/java/org/apache/falcon/hive/DRTest.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.hive; + +/** + * Test class for DR. + */ +public class DRTest { + public void testHiveDr(String[] args) { + String[] testArgs = { + "-sourceMetastoreUri", "thrift://localhost:9083", + "-sourceDatabase", "default", + "-sourceTable", "test", + "-sourceStagingPath", "/apps/hive/tools/dr", + "-sourceNN", "hdfs://localhost:8020", + "-sourceRM", "local", + + "-targetMetastoreUri", "thrift://localhost:9083", + "-targetStagingPath", "/apps/hive/tools/dr", + "-targetNN", "hdfs://localhost:8020", + "-targetRM", "local", + + "-maxEvents", "5", + "-replicationMaxMaps", "1", + "-distcpMapBandwidth", "4", + }; + HiveDRTool.main(testArgs); + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/addons/hivedr/src/test/java/org/apache/falcon/hive/HiveDRStatusStoreTest.java ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/test/java/org/apache/falcon/hive/HiveDRStatusStoreTest.java b/addons/hivedr/src/test/java/org/apache/falcon/hive/HiveDRStatusStoreTest.java new file mode 100644 index 0000000..c89c661 --- /dev/null +++ b/addons/hivedr/src/test/java/org/apache/falcon/hive/HiveDRStatusStoreTest.java @@ -0,0 +1,346 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.hive; + +import org.apache.falcon.cluster.util.EmbeddedCluster; +import org.apache.falcon.hadoop.JailedFileSystem; +import org.apache.falcon.hive.exception.HiveReplicationException; +import org.apache.falcon.hive.util.DRStatusStore; +import org.apache.falcon.hive.util.HiveDRStatusStore; +import org.apache.falcon.hive.util.ReplicationStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; + +/** + * Unit tests for HiveDRStatusStore. + */ +@Test +public class HiveDRStatusStoreTest { + private HiveDRStatusStore drStatusStore; + private FileSystem fileSystem = new JailedFileSystem(); + + public HiveDRStatusStoreTest() throws Exception { + EmbeddedCluster cluster = EmbeddedCluster.newCluster("hiveReplTest"); + Path storePath = new Path(DRStatusStore.BASE_DEFAULT_STORE_PATH); + + fileSystem.initialize(LocalFileSystem.getDefaultUri(cluster.getConf()), cluster.getConf()); + if (fileSystem.exists(storePath)) { + fileSystem.delete(storePath, true); + } + FileSystem.mkdirs(fileSystem, storePath, DRStatusStore.DEFAULT_STORE_PERMISSION); + try { + new HiveDRStatusStore(fileSystem); + Assert.fail(); + } catch (IOException ie) { + // Exception expected. + Assert.assertEquals(ie.getMessage(), "Base dir jail://hiveReplTest:00" + storePath.toUri() + + " does not have correct ownership/permissions." + + " Please set group to " + DRStatusStore.getStoreGroup() + " and permissions to rwxrwx---"); + } + drStatusStore = new HiveDRStatusStore(fileSystem, fileSystem.getFileStatus(storePath).getGroup()); + } + + @BeforeClass + public void updateReplicationStatusTest() throws Exception { + ReplicationStatus dbStatus = new ReplicationStatus("source", "target", "jobname", + "Default1", null, ReplicationStatus.Status.SUCCESS, 20L); + ReplicationStatus table1 = new ReplicationStatus("source", "target", "jobname", + "Default1", "table1", ReplicationStatus.Status.SUCCESS, 20L); + ReplicationStatus table2 = new ReplicationStatus("source", "target", "jobname", + "default1", "Table2", ReplicationStatus.Status.INIT, -1L); + ReplicationStatus table3 = new ReplicationStatus("source", "target", "jobname", + "Default1", "Table3", ReplicationStatus.Status.FAILURE, 15L); + ReplicationStatus table4 = new ReplicationStatus("source", "target", "jobname", + "default1", "table4", ReplicationStatus.Status.FAILURE, 18L); + ArrayList<ReplicationStatus> replicationStatusList = new ArrayList<ReplicationStatus>(); + replicationStatusList.add(table1); + replicationStatusList.add(table2); + replicationStatusList.add(table3); + replicationStatusList.add(table4); + replicationStatusList.add(dbStatus); + drStatusStore.updateReplicationStatus("jobname", replicationStatusList); + } + + public void updateReplicationStatusNewTablesTest() throws Exception { + ReplicationStatus dbStatus = new ReplicationStatus("source", "target", "jobname2", + "default2", null, ReplicationStatus.Status.SUCCESS, 20L); + ReplicationStatus table1 = new ReplicationStatus("source", "target", "jobname2", + "Default2", "table1", ReplicationStatus.Status.SUCCESS, 20L); + ReplicationStatus table2 = new ReplicationStatus("source", "target", "jobname2", + "default2", "Table2", ReplicationStatus.Status.INIT, -1L); + ReplicationStatus table3 = new ReplicationStatus("source", "target", "jobname2", + "default2", "table3", ReplicationStatus.Status.FAILURE, 15L); + ReplicationStatus table4 = new ReplicationStatus("source", "target", "jobname2", + "Default2", "Table4", ReplicationStatus.Status.FAILURE, 18L); + ArrayList<ReplicationStatus> replicationStatusList = new ArrayList<ReplicationStatus>(); + replicationStatusList.add(table1); + replicationStatusList.add(table2); + replicationStatusList.add(table3); + replicationStatusList.add(table4); + replicationStatusList.add(dbStatus); + + drStatusStore.updateReplicationStatus("jobname2", replicationStatusList); + ReplicationStatus status = drStatusStore.getReplicationStatus("source", "target", "jobname2", "default2"); + Assert.assertEquals(status.getEventId(), 15); + Assert.assertEquals(status.getStatus(), ReplicationStatus.Status.FAILURE); + Assert.assertEquals(status.getJobName(), "jobname2"); + Assert.assertEquals(status.getTable(), null); + Assert.assertEquals(status.getSourceUri(), "source"); + + Iterator<ReplicationStatus> iter = drStatusStore.getTableReplicationStatusesInDb("source", "target", + "jobname2", "default2"); + int size = 0; + while(iter.hasNext()) { + iter.next(); + size++; + } + Assert.assertEquals(4, size); + + table3 = new ReplicationStatus("source", "target", "jobname2", + "default2", "table3", ReplicationStatus.Status.SUCCESS, 25L); + table4 = new ReplicationStatus("source", "target", "jobname2", + "Default2", "table4", ReplicationStatus.Status.SUCCESS, 22L); + ReplicationStatus table5 = new ReplicationStatus("source", "target", "jobname2", + "default2", "Table5", ReplicationStatus.Status.SUCCESS, 18L); + ReplicationStatus db1table1 = new ReplicationStatus("source", "target", "jobname2", + "Default1", "Table1", ReplicationStatus.Status.SUCCESS, 18L); + replicationStatusList = new ArrayList<ReplicationStatus>(); + replicationStatusList.add(table5); + replicationStatusList.add(table3); + replicationStatusList.add(table4); + replicationStatusList.add(db1table1); + + drStatusStore.updateReplicationStatus("jobname2", replicationStatusList); + status = drStatusStore.getReplicationStatus("source", "target", "jobname2", "default1"); + Assert.assertEquals(status.getEventId(), 18); + Assert.assertEquals(status.getStatus(), ReplicationStatus.Status.SUCCESS); + + status = drStatusStore.getReplicationStatus("source", "target", "jobname2", "default2"); + Assert.assertEquals(status.getEventId(), 25); + Assert.assertEquals(status.getStatus(), ReplicationStatus.Status.SUCCESS); + + iter = drStatusStore.getTableReplicationStatusesInDb("source", "target", + "jobname2", "default2"); + size = 0; + while(iter.hasNext()) { + iter.next(); + size++; + } + Assert.assertEquals(5, size); + } + + public void getReplicationStatusDBTest() throws HiveReplicationException { + ReplicationStatus status = drStatusStore.getReplicationStatus("source", "target", "jobname", "Default1"); + Assert.assertEquals(status.getEventId(), 15); + Assert.assertEquals(status.getStatus(), ReplicationStatus.Status.FAILURE); + Assert.assertEquals(status.getJobName(), "jobname"); + Assert.assertEquals(status.getTable(), null); + Assert.assertEquals(status.getSourceUri(), "source"); + } + + public void checkReplicationConflictTest() throws HiveReplicationException { + + try { + //same source, same job, same DB, null table : pass + drStatusStore.checkForReplicationConflict("source", "jobname", "default1", null); + + //same source, same job, same DB, same table : pass + drStatusStore.checkForReplicationConflict("source", "jobname", "default1", "table1"); + + //same source, same job, different DB, null table : pass + drStatusStore.checkForReplicationConflict("source", "jobname", "diffDB", null); + + //same source, same job, different DB, different table : pass + drStatusStore.checkForReplicationConflict("source", "jobname", "diffDB", "diffTable"); + + // same source, different job, same DB, diff table : pass + drStatusStore.checkForReplicationConflict("source", "diffJob", "default1", "diffTable"); + } catch (Exception e) { + Assert.fail(e.getMessage()); + } + + try { + // different source, same job, same DB, null table : fail + drStatusStore.checkForReplicationConflict("diffSource", "jobname", "default1", null); + Assert.fail(); + } catch (HiveReplicationException e) { + Assert.assertEquals(e.getMessage(), + "Two different sources are attempting to replicate to same db default1." + + " New Source = diffSource, Existing Source = source"); + } + + try { + // same source, different job, same DB, null table : fail + drStatusStore.checkForReplicationConflict("source", "diffJob", "default1", null); + Assert.fail(); + } catch (HiveReplicationException e) { + Assert.assertEquals(e.getMessage(), + "Two different jobs are attempting to replicate to same db default1." + + " New Job = diffJob, Existing Job = jobname"); + } + + try { + // same source, different job, same DB, same table : fail + drStatusStore.checkForReplicationConflict("source", "diffJob", "default1", "table1"); + Assert.fail(); + } catch (HiveReplicationException e) { + Assert.assertEquals(e.getMessage(), + "Two different jobs are trying to replicate to same table table1." + + " New job = diffJob, Existing job = jobname"); + } + + + } + + public void deleteReplicationStatusTest() throws Exception { + ReplicationStatus dbStatus = new ReplicationStatus("source", "target", "deleteJob", + "deleteDB", null, ReplicationStatus.Status.SUCCESS, 20L); + ReplicationStatus table1 = new ReplicationStatus("source", "target", "deleteJob", + "deleteDB", "Table1", ReplicationStatus.Status.SUCCESS, 20L); + ArrayList<ReplicationStatus> replicationStatusList = new ArrayList<ReplicationStatus>(); + replicationStatusList.add(table1); + replicationStatusList.add(dbStatus); + drStatusStore.updateReplicationStatus("deleteJob", replicationStatusList); + + ReplicationStatus status = drStatusStore.getReplicationStatus("source", "target", "deleteJob", "deleteDB"); + Path statusPath = drStatusStore.getStatusDirPath(status.getDatabase(), status.getJobName()); + Assert.assertEquals(fileSystem.exists(statusPath), true); + + drStatusStore.deleteReplicationStatus("deleteJob", "deleteDB"); + Assert.assertEquals(fileSystem.exists(statusPath), false); + } + + public void getReplicationStatusTableTest() throws HiveReplicationException { + ReplicationStatus status = drStatusStore.getReplicationStatus("source", "target", + "jobname", "default1", "table1"); + Assert.assertEquals(status.getEventId(), 20); + Assert.assertEquals(status.getStatus(), ReplicationStatus.Status.SUCCESS); + Assert.assertEquals(status.getTable(), "table1"); + + status = drStatusStore.getReplicationStatus("source", "target", + "jobname", "Default1", "Table2"); + Assert.assertEquals(status.getEventId(), -1); + Assert.assertEquals(status.getStatus(), ReplicationStatus.Status.INIT); + Assert.assertEquals(status.getTable(), "table2"); + + status = drStatusStore.getReplicationStatus("source", "target", + "jobname", "default1", "Table3"); + Assert.assertEquals(status.getEventId(), 15); + Assert.assertEquals(status.getStatus(), ReplicationStatus.Status.FAILURE); + Assert.assertEquals(status.getTable(), "table3"); + + status = drStatusStore.getReplicationStatus("source", "target", + "jobname", "default1", "table4"); + Assert.assertEquals(status.getEventId(), 18); + Assert.assertEquals(status.getStatus(), ReplicationStatus.Status.FAILURE); + Assert.assertEquals(status.getTable(), "table4"); + } + + public void getTableReplicationStatusesInDbTest() throws HiveReplicationException { + Iterator<ReplicationStatus> iter = drStatusStore.getTableReplicationStatusesInDb("source", "target", + "jobname", "Default1"); + int size = 0; + while(iter.hasNext()) { + size++; + ReplicationStatus status = iter.next(); + if (status.getTable().equals("table3")) { + Assert.assertEquals(status.getEventId(), 15); + Assert.assertEquals(status.getStatus(), ReplicationStatus.Status.FAILURE); + Assert.assertEquals(status.getTable(), "table3"); + } + } + Assert.assertEquals(4, size); + } + + public void fileRotationTest() throws Exception { + // initialize replication status store for db default3. + // This should init with eventId = -1 and status = INIT + ReplicationStatus status = drStatusStore.getReplicationStatus("source", "target", + "jobname3", "default3"); + Assert.assertEquals(status.getEventId(), -1); + Assert.assertEquals(status.getStatus(), ReplicationStatus.Status.INIT); + + // update status 5 times resulting in 6 files : latest.json + five rotated files + ReplicationStatus dbStatus = new ReplicationStatus("source", "target", "jobname3", + "Default3", null, ReplicationStatus.Status.SUCCESS, 20L); + ReplicationStatus table1 = new ReplicationStatus("source", "target", "jobname3", + "default3", "Table1", ReplicationStatus.Status.SUCCESS, 20L); + ArrayList<ReplicationStatus> replicationStatusList = new ArrayList<ReplicationStatus>(); + replicationStatusList.add(table1); + replicationStatusList.add(dbStatus); + + for(int i=0; i<5; i++) { + Thread.sleep(2000); + drStatusStore.updateReplicationStatus("jobname3", replicationStatusList); + } + + status = drStatusStore.getReplicationStatus("source", "target", "jobname3", "default3"); + Path statusPath = drStatusStore.getStatusDirPath(status.getDatabase(), status.getJobName()); + RemoteIterator<LocatedFileStatus> iter = fileSystem.listFiles(statusPath, false); + Assert.assertEquals(getRemoteIterSize(iter), 6); + + drStatusStore.rotateStatusFiles(statusPath, 3, 10000000); + iter = fileSystem.listFiles(statusPath, false); + Assert.assertEquals(getRemoteIterSize(iter), 6); + + drStatusStore.rotateStatusFiles(statusPath, 3, 6000); + iter = fileSystem.listFiles(statusPath, false); + Assert.assertEquals(getRemoteIterSize(iter), 3); + } + + public void wrongJobNameTest() throws Exception { + ReplicationStatus dbStatus = new ReplicationStatus("source", "target", "jobname3", + "Default3", null, ReplicationStatus.Status.SUCCESS, 20L); + ArrayList<ReplicationStatus> replicationStatusList = new ArrayList<ReplicationStatus>(); + replicationStatusList.add(dbStatus); + + try { + drStatusStore.updateReplicationStatus("jobname2", replicationStatusList); + Assert.fail(); + } catch (HiveReplicationException e) { + // Expected exception due to jobname mismatch + } + } + + @AfterClass + public void cleanUp() throws IOException { + fileSystem.delete(new Path(DRStatusStore.BASE_DEFAULT_STORE_PATH), true); + } + + private int getRemoteIterSize(RemoteIterator<LocatedFileStatus> iter) throws IOException { + int size = 0; + while(iter.hasNext()) { + iter.next(); + size++; + } + return size; + } + + +} http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/addons/hivedr/src/test/java/org/apache/falcon/hive/ReplicationStatusTest.java ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/test/java/org/apache/falcon/hive/ReplicationStatusTest.java b/addons/hivedr/src/test/java/org/apache/falcon/hive/ReplicationStatusTest.java new file mode 100644 index 0000000..a02639c --- /dev/null +++ b/addons/hivedr/src/test/java/org/apache/falcon/hive/ReplicationStatusTest.java @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.hive; + +import org.apache.falcon.hive.exception.HiveReplicationException; +import org.apache.falcon.hive.util.ReplicationStatus; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +/** + * Unit tests for ReplicationStatus. + */ +@Test +public class ReplicationStatusTest { + + private ReplicationStatus dbStatus, tableStatus; + + public ReplicationStatusTest() {} + + + @BeforeClass + public void prepare() throws Exception { + dbStatus = new ReplicationStatus("source", "target", "jobname", + "default1", null, ReplicationStatus.Status.INIT, 0L); + tableStatus = new ReplicationStatus("source", "target", "jobname", + "testDb", "Table1", ReplicationStatus.Status.SUCCESS, 0L); + } + + public void replicationStatusSerializeTest() throws Exception { + String expected = "{\n \"sourceUri\": \"source\",\n" + + " \"targetUri\": \"target\",\n \"jobName\": \"jobname\",\n" + + " \"database\": \"testdb\",\n \"table\": \"table1\",\n" + + " \"status\": \"SUCCESS\",\n \"eventId\": 0\n}"; + String actual = tableStatus.toJsonString(); + Assert.assertEquals(actual, expected); + + expected = "{\n \"sourceUri\": \"source\",\n \"targetUri\": \"target\",\n" + + " \"jobName\": \"jobname\",\n \"database\": \"default1\",\n" + + " \"status\": \"INIT\",\n \"eventId\": 0\n}"; + actual = dbStatus.toJsonString(); + Assert.assertEquals(actual, expected); + } + + public void replicationStatusDeserializeTest() throws Exception { + String tableInput = "{\n \"sourceUri\": \"source\",\n" + + " \"targetUri\": \"target\",\n \"jobName\": \"testJob\",\n" + + " \"database\": \"Test1\",\n \"table\": \"table1\",\n" + + " \"status\": \"SUCCESS\",\n \"eventId\": 0\n}"; + String dbInput = "{ \"sourceUri\": \"source\", \"targetUri\": \"target\",\"jobName\": \"jobname\",\n" + + " \"database\": \"default1\", \"status\": \"FAILURE\"," + + " \"eventId\": 27, \"statusLog\": \"testLog\"}"; + + ReplicationStatus newDbStatus = new ReplicationStatus(dbInput); + ReplicationStatus newTableStatus = new ReplicationStatus(tableInput); + + Assert.assertEquals(newDbStatus.getTable(), null); + Assert.assertEquals(newDbStatus.getEventId(), 27); + Assert.assertEquals(newDbStatus.getDatabase(), "default1"); + Assert.assertEquals(newDbStatus.getLog(), "testLog"); + Assert.assertEquals(newDbStatus.getStatus(), ReplicationStatus.Status.FAILURE); + + + Assert.assertEquals(newTableStatus.getTable(), "table1"); + Assert.assertEquals(newTableStatus.getEventId(), 0); + Assert.assertEquals(newTableStatus.getDatabase(), "test1"); + Assert.assertEquals(newTableStatus.getJobName(), "testJob"); + + // no table, no eventId, no log + dbInput = "{\n \"sourceUri\": \"source\",\n" + + " \"targetUri\": \"target\",\n \"jobName\": \"testJob\",\n" + + " \"database\": \"Test1\",\n" + + " \"status\": \"SUCCESS\"\n}"; + newDbStatus = new ReplicationStatus(dbInput); + + Assert.assertEquals(newDbStatus.getDatabase(), "test1"); + Assert.assertEquals(newDbStatus.getTable(), null); + Assert.assertEquals(newDbStatus.getEventId(), -1); + Assert.assertEquals(newDbStatus.getLog(), null); + + } + + public void invalidEventIdTest() throws Exception { + String tableInput = "{\n \"sourceUri\": \"source\",\n" + + " \"targetUri\": \"target\",\n \"jobName\": \"testJob\",\n" + + " \"database\": \"test1\",\n \"table\": \"table1\",\n" + + " \"status\": \"SUCCESS\",\n \"eventId\": -100\n}"; + + ReplicationStatus newTableStatus = new ReplicationStatus(tableInput); + Assert.assertEquals(newTableStatus.getEventId(), -1); + + newTableStatus.setEventId(-200); + Assert.assertEquals(newTableStatus.getEventId(), -1); + + String expected = "{\n \"sourceUri\": \"source\",\n" + + " \"targetUri\": \"target\",\n \"jobName\": \"testJob\",\n" + + " \"database\": \"test1\",\n \"table\": \"table1\",\n" + + " \"status\": \"SUCCESS\",\n \"eventId\": -1\n}"; + String actual = newTableStatus.toJsonString(); + Assert.assertEquals(actual, expected); + + newTableStatus.setEventId(50); + Assert.assertEquals(newTableStatus.getEventId(), 50); + } + + public void invalidStatusTest() throws Exception { + + String dbInput = "{ \"sourceUri\": \"source\", \"targetUri\": \"target\",\"jobName\": \"jobname\",\n" + + " \"database\": \"default1\", \"status\": \"BLAH\"," + + " \"eventId\": 27, \"statusLog\": \"testLog\"}"; + + try { + new ReplicationStatus(dbInput); + Assert.fail(); + } catch (HiveReplicationException e) { + Assert.assertEquals(e.getMessage(), + "Unable to deserialize jsonString to ReplicationStatus. Invalid status BLAH"); + } + } + + +} http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/addons/recipes/hdfs-replication/src/main/resources/hdfs-replication-template.xml ---------------------------------------------------------------------- diff --git a/addons/recipes/hdfs-replication/src/main/resources/hdfs-replication-template.xml b/addons/recipes/hdfs-replication/src/main/resources/hdfs-replication-template.xml index 824e6f5..de0f748 100644 --- a/addons/recipes/hdfs-replication/src/main/resources/hdfs-replication-template.xml +++ b/addons/recipes/hdfs-replication/src/main/resources/hdfs-replication-template.xml @@ -17,30 +17,27 @@ limitations under the License. --> -<process name="##name##" xmlns="uri:falcon:process:0.1"> +<process name="##falcon.recipe.job.name##" xmlns="uri:falcon:process:0.1"> <clusters> <!-- source --> - <cluster name="##src.cluster.name##"> - <validity end="##src.cluster.validity.end##" start="##src.cluster.validity.start##"/> + <cluster name="##falcon.recipe.cluster.name##"> + <validity end="##falcon.recipe.cluster.validity.end##" start="##falcon.recipe.cluster.validity.start##"/> </cluster> </clusters> + <tags>_falcon_mirroring_type=HDFS</tags> + <parallel>1</parallel> <!-- Dir replication needs to run only once to catch up --> <order>LAST_ONLY</order> - <frequency>##process.frequency##</frequency> + <frequency>##falcon.recipe.frequency##</frequency> <timezone>UTC</timezone> <properties> <property name="oozie.wf.subworkflow.classpath.inheritance" value="true"/> - <property name="##process.property2.name##" value="##process.property2.value##"/> - <property name="##process.property3.name##" value="##process.property3.value##"/> - <property name="##process.property4.name##" value="##process.property4.value##"/> - <property name="##process.property5.name##" value="##process.property5.value##"/> - <property name="##process.property6.name##" value="##process.property6.value##"/> - <property name="##process.property7.name##" value="##process.property7.value##"/> </properties> - <workflow name="##workflow.name##" engine="oozie" path="##workflow.path##" lib="##workflow.lib.path##"/> - <retry policy="periodic" delay="minutes(10)" attempts="3"/> -</process> + <workflow name="##falcon.recipe.workflow.name##" engine="oozie" path="/apps/data-mirroring/workflows/hdfs-replication-workflow.xml" lib="##workflow.lib.path##"/> + <retry policy="##falcon.recipe.retry.policy##" delay="##falcon.recipe.retry.delay##" attempts="3"/> + <ACL/> +</process> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/addons/recipes/hdfs-replication/src/main/resources/hdfs-replication-workflow.xml ---------------------------------------------------------------------- diff --git a/addons/recipes/hdfs-replication/src/main/resources/hdfs-replication-workflow.xml b/addons/recipes/hdfs-replication/src/main/resources/hdfs-replication-workflow.xml index 145d489..d6a4ee9 100644 --- a/addons/recipes/hdfs-replication/src/main/resources/hdfs-replication-workflow.xml +++ b/addons/recipes/hdfs-replication/src/main/resources/hdfs-replication-workflow.xml @@ -47,28 +47,78 @@ <name>oozie.launcher.oozie.libpath</name> <value>${wf:conf("falcon.libpath")}</value> </property> + <property> + <name>oozie.launcher.mapreduce.job.hdfs-servers</name> + <value>${drSourceClusterFS},${drTargetClusterFS}</value> + </property> </configuration> <main-class>org.apache.falcon.replication.FeedReplicator</main-class> <arg>-Dmapred.job.queue.name=${queueName}</arg> <arg>-Dmapred.job.priority=${jobPriority}</arg> <arg>-maxMaps</arg> - <arg>${maxMaps}</arg> + <arg>${distcpMaxMaps}</arg> <arg>-mapBandwidth</arg> - <arg>${mapBandwidth}</arg> + <arg>${distcpMapBandwidth}</arg> <arg>-sourcePaths</arg> - <arg>${nameNode}${drSourceDir}</arg> + <arg>${drSourceDir}</arg> <arg>-targetPath</arg> <arg>${drTargetClusterFS}${drTargetDir}</arg> <arg>-falconFeedStorageType</arg> <arg>FILESYSTEM</arg> + <arg>-availabilityFlag</arg> + <arg>${availabilityFlag == 'NA' ? "NA" : availabilityFlag}</arg> + <arg>-counterLogDir</arg> + <arg>${logDir}/job-${nominalTime}</arg> </java> + <ok to="success"/> + <error to="failure"/> + </action> + <decision name="success"> + <switch> + <case to="successAlert"> + ${drNotificationReceivers ne 'NA'} + </case> + <default to="end"/> + </switch> + </decision> + <decision name="failure"> + <switch> + <case to="failureAlert"> + ${drNotificationReceivers ne 'NA'} + </case> + <default to="fail"/> + </switch> + </decision> + <action name="successAlert"> + <email xmlns="uri:oozie:email-action:0.2"> + <to>${drNotificationReceivers}</to> + <subject>INFO: HDFS DR workflow ${entityName} completed successfully</subject> + <body> + The HDFS DR workflow ${wf:id()} is successful. + Source = ${drSourceDir} + Target = ${drTargetClusterFS}${drTargetDir} + </body> + </email> + <ok to="end"/> + <error to="end"/> + </action> + <action name="failureAlert"> + <email xmlns="uri:oozie:email-action:0.2"> + <to>${drNotificationReceivers}</to> + <subject>ERROR: HDFS DR workflow ${entityName} failed</subject> + <body> + The workflow ${wf:id()} had issues and was killed. The error message is: ${wf:errorMessage(wf:lastErrorNode())} + Source = ${drSourceDir} + Target = ${drTargetClusterFS}${drTargetDir} + </body> + </email> <ok to="end"/> <error to="fail"/> </action> <kill name="fail"> <message> - Workflow failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + Workflow action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] </message> </kill> - <end name='end'/> + <end name="end"/> </workflow-app> http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/addons/recipes/hdfs-replication/src/main/resources/hdfs-replication.properties ---------------------------------------------------------------------- diff --git a/addons/recipes/hdfs-replication/src/main/resources/hdfs-replication.properties b/addons/recipes/hdfs-replication/src/main/resources/hdfs-replication.properties index 19b8459..64ab6b8 100644 --- a/addons/recipes/hdfs-replication/src/main/resources/hdfs-replication.properties +++ b/addons/recipes/hdfs-replication/src/main/resources/hdfs-replication.properties @@ -19,47 +19,59 @@ ##### NOTE: This is a TEMPLATE file which can be copied and edited ##### Recipe properties -falcon.recipe.name=hdfs-replication - +##### Unique recipe job name +falcon.recipe.name=sales-monthly ##### Workflow properties - falcon.recipe.workflow.name=hdfs-dr-workflow # Provide Wf absolute path. This can be HDFS or local FS path. If WF is on local FS it will be copied to HDFS -falcon.recipe.workflow.path=/recipes/hdfs-replication/hdfs-replication-workflow.xml +falcon.recipe.workflow.path=/apps/data-mirroring/workflows/hdfs-replication-workflow.xml # Provide Wf lib absolute path. This can be HDFS or local FS path. If libs are on local FS it will be copied to HDFS #falcon.recipe.workflow.lib.path=/recipes/hdfs-replication/lib ##### Cluster properties +# Cluster where job should run +falcon.recipe.cluster.name=primaryCluster +# Change the cluster hdfs write end point here. This is mandatory. +falcon.recipe.cluster.hdfs.writeEndPoint=hdfs://240.0.0.10:8020 +# Change the cluster validity start time here +falcon.recipe.cluster.validity.start=2015-03-13T00:00Z +# Change the cluster validity end time here +falcon.recipe.cluster.validity.end=2016-12-30T00:00Z -# Change the src cluster name here -falcon.recipe.src.cluster.name=test -# Change the src cluster hdfs write end point here. This is mandatory. -falcon.recipe.src.cluster.hdfs.writeEndPoint=hdfs://sandbox.hortonworks.com:8020 -# Change the src cluster validity start time here -falcon.recipe.src.cluster.validity.start=2014-10-01T00:00Z -# Change the src cluster validity end time here -falcon.recipe.src.cluster.validity.end=2016-12-30T00:00Z +##### Scheduling properties +# Change the recipe frequency here. Valid frequency type are minutes, hours, days, months +falcon.recipe.process.frequency=minutes(5) +##### Tag properties - An optional list of comma separated tags, Key Value Pairs, separated by comma +##### Uncomment to add tags +#falcon.recipe.tags= -##### Scheduling properties +##### Retry policy properties -# Change the process here. Valid frequency type are minutes, hours, days, months -falcon.recipe.process.frequency=minutes(60) +falcon.recipe.retry.policy=periodic +falcon.recipe.retry.delay=minutes(30) +falcon.recipe.retry.attempts=3 +##### ACL properties - Uncomment and change ACL if authorization is enabled + +falcon.recipe.acl.owner=ambari-qa +falcon.recipe.acl.group=users +falcon.recipe.acl.permission=0x755 +falcon.recipe.nn.principal=nn/[email protected] ##### Custom Job properties -# Specify property names and values for properties defined in recipe template -falcon.recipe.process.property2.name=drSourceDir -falcon.recipe.process.property2.value=/falcon/test/srcCluster/input -falcon.recipe.process.property3.name=drTargetClusterFS -falcon.recipe.process.property3.value=hdfs://sandbox.hortonworks.com:8020 -falcon.recipe.process.property4.name=drTargetDir -falcon.recipe.process.property4.value=/falcon/test/targetCluster/input -falcon.recipe.process.property5.name=drTargetCluster -falcon.recipe.process.property5.value=backupCluster -falcon.recipe.process.property6.name=maxMaps -falcon.recipe.process.property6.value=5 -falcon.recipe.process.property7.name=mapBandwidth -falcon.recipe.process.property7.value=100 +# Specify multiple comma separated source directories +drSourceDir=/user/hrt_qa/dr/test/primaryCluster/input +drSourceClusterFS=hdfs://240.0.0.10:8020 +drTargetDir=/user/hrt_qa/dr/test/backupCluster/input +drTargetClusterFS=hdfs://240.0.0.11:8020 + +# Change it to specify the maximum number of mappers for DistCP +distcpMaxMaps=1 +# Change it to specify the bandwidth in MB for each mapper in DistCP +distcpMapBandwidth=100 + +##### Email on failure +drNotificationReceivers=NA http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/addons/recipes/hive-disaster-recovery/README.txt ---------------------------------------------------------------------- diff --git a/addons/recipes/hive-disaster-recovery/README.txt b/addons/recipes/hive-disaster-recovery/README.txt new file mode 100644 index 0000000..ab393b1 --- /dev/null +++ b/addons/recipes/hive-disaster-recovery/README.txt @@ -0,0 +1,58 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +Hive Metastore Disaster Recovery Recipe + +Overview +This recipe implements replicating hive metadata and data from one +Hadoop cluster to another Hadoop cluster. +This piggy backs on replication solution in Falcon which uses the DistCp tool. + +Use Case +* +* + +Limitations +* +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +Hive Metastore Disaster Recovery Recipe + +Overview +This recipe implements replicating hive metadata and data from one +Hadoop cluster to another Hadoop cluster. +This piggy backs on replication solution in Falcon which uses the DistCp tool. + +Use Case +* +* + +Limitations +* http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/addons/recipes/hive-disaster-recovery/pom.xml ---------------------------------------------------------------------- diff --git a/addons/recipes/hive-disaster-recovery/pom.xml b/addons/recipes/hive-disaster-recovery/pom.xml new file mode 100644 index 0000000..1732907 --- /dev/null +++ b/addons/recipes/hive-disaster-recovery/pom.xml @@ -0,0 +1,32 @@ +<?xml version="1.0" encoding="UTF-8"?> + +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + <groupId>org.apache.falcon.recipes</groupId> + <artifactId>falcon-hive-replication-recipe</artifactId> + <version>0.7-SNAPSHOT</version> + <description>Apache Falcon Hive Disaster Recovery Recipe</description> + <name>Apache Falcon Sample Hive Disaster Recovery Recipe</name> + <packaging>jar</packaging> +</project> http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure-template.xml ---------------------------------------------------------------------- diff --git a/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure-template.xml b/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure-template.xml new file mode 100644 index 0000000..3afbef0 --- /dev/null +++ b/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure-template.xml @@ -0,0 +1,44 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + +<process name="##name##" xmlns="uri:falcon:process:0.1"> + <clusters> + <!-- source --> + <cluster name="##cluster.name##"> + <validity end="##cluster.validity.end##" start="##cluster.validity.start##"/> + </cluster> + </clusters> + + <tags>_falcon_mirroring_type=HIVE</tags> + + <parallel>1</parallel> + <!-- Replication needs to run only once to catch up --> + <order>LAST_ONLY</order> + <frequency>##process.frequency##</frequency> + <timezone>UTC</timezone> + + <properties> + <property name="oozie.wf.subworkflow.classpath.inheritance" value="true"/> + </properties> + + <workflow name="##workflow.name##" engine="oozie" + path="/apps/data-mirroring/workflows/hive-disaster-recovery-workflow.xml" lib="##workflow.lib.path##"/> + <retry policy="##retry.policy##" delay="##retry.delay##" attempts="3"/> + <ACL/> +</process> http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure-workflow.xml ---------------------------------------------------------------------- diff --git a/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure-workflow.xml b/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure-workflow.xml new file mode 100644 index 0000000..7362c2e --- /dev/null +++ b/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure-workflow.xml @@ -0,0 +1,401 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> +<workflow-app xmlns='uri:oozie:workflow:0.3' name='falcon-dr-hive-workflow'> + <credentials> + <credential name='hive_src_credentials' type='hcat'> + <property> + <name>hcat.metastore.uri</name> + <value>${sourceMetastoreUri}</value> + </property> + <property> + <name>hcat.metastore.principal</name> + <value>${sourceHiveMetastoreKerberosPrincipal}</value> + </property> + </credential> + <credential name='hive_tgt_credentials' type='hcat'> + <property> + <name>hcat.metastore.uri</name> + <value>${targetMetastoreUri}</value> + </property> + <property> + <name>hcat.metastore.principal</name> + <value>${targetHiveMetastoreKerberosPrincipal}</value> + </property> + </credential> + <credential name="hive2_src_credentials" type="hive2"> + <property> + <name>hive2.server.principal</name> + <value>${sourceHive2KerberosPrincipal}</value> + </property> + <property> + <name>hive2.jdbc.url</name> + <value>jdbc:${sourceHiveServer2Uri}/${sourceDatabase}</value> + </property> + </credential> + <credential name="hive2_tgt_credentials" type="hive2"> + <property> + <name>hive2.server.principal</name> + <value>${targetHive2KerberosPrincipal}</value> + </property> + <property> + <name>hive2.jdbc.url</name> + <value>jdbc:${targetHiveServer2Uri}/${sourceDatabase}</value> + </property> + </credential> + </credentials> + <start to='last-event'/> + <action name="last-event" cred="hive_tgt_credentials"> + <java> + <job-tracker>${jobTracker}</job-tracker> + <name-node>${nameNode}</name-node> + <configuration> + <property> <!-- hadoop 2 parameter --> + <name>oozie.launcher.mapreduce.job.user.classpath.first</name> + <value>true</value> + </property> + <property> + <name>mapred.job.queue.name</name> + <value>${queueName}</value> + </property> + <property> + <name>oozie.launcher.mapred.job.priority</name> + <value>${jobPriority}</value> + </property> + <property> + <name>oozie.use.system.libpath</name> + <value>true</value> + </property> + <property> + <name>oozie.action.sharelib.for.java</name> + <value>distcp,hive,hive2,hcatalog</value> + </property> + <property> + <name>oozie.launcher.mapreduce.job.hdfs-servers</name> + <value>${sourceNN},${targetNN}</value> + </property> + <property> + <name>mapreduce.job.hdfs-servers</name> + <value>${sourceNN},${targetNN}</value> + </property> + </configuration> + <main-class>org.apache.falcon.hive.HiveDRTool</main-class> + <arg>-Dmapred.job.queue.name=${queueName}</arg> + <arg>-Dmapred.job.priority=${jobPriority}</arg> + <arg>-falconLibPath</arg> + <arg>${wf:conf("falcon.libpath")}</arg> + <arg>-sourceCluster</arg> + <arg>${sourceCluster}</arg> + <arg>-sourceMetastoreUri</arg> + <arg>${sourceMetastoreUri}</arg> + <arg>-sourceHiveServer2Uri</arg> + <arg>${sourceHiveServer2Uri}</arg> + <arg>-sourceDatabase</arg> + <arg>${sourceDatabase}</arg> + <arg>-sourceTable</arg> + <arg>${sourceTable}</arg> + <arg>-sourceStagingPath</arg> + <arg>${sourceStagingPath}</arg> + <arg>-sourceNN</arg> + <arg>${sourceNN}</arg> + <arg>-sourceNNKerberosPrincipal</arg> + <arg>${sourceNNKerberosPrincipal}</arg> + <arg>-sourceHiveMetastoreKerberosPrincipal</arg> + <arg>${sourceHiveMetastoreKerberosPrincipal}</arg> + <arg>-sourceHive2KerberosPrincipal</arg> + <arg>${sourceHive2KerberosPrincipal}</arg> + <arg>-targetCluster</arg> + <arg>${targetCluster}</arg> + <arg>-targetMetastoreUri</arg> + <arg>${targetMetastoreUri}</arg> + <arg>-targetHiveServer2Uri</arg> + <arg>${targetHiveServer2Uri}</arg> + <arg>-targetStagingPath</arg> + <arg>${targetStagingPath}</arg> + <arg>-targetNN</arg> + <arg>${targetNN}</arg> + <arg>-targetNNKerberosPrincipal</arg> + <arg>${targetNNKerberosPrincipal}</arg> + <arg>-targetHiveMetastoreKerberosPrincipal</arg> + <arg>${targetHiveMetastoreKerberosPrincipal}</arg> + <arg>-targetHive2KerberosPrincipal</arg> + <arg>${targetHive2KerberosPrincipal}</arg> + <arg>-maxEvents</arg> + <arg>${maxEvents}</arg> + <arg>-clusterForJobRun</arg> + <arg>${clusterForJobRun}</arg> + <arg>-clusterForJobRunWriteEP</arg> + <arg>${clusterForJobRunWriteEP}</arg> + <arg>-clusterForJobNNKerberosPrincipal</arg> + <arg>${clusterForJobNNKerberosPrincipal}</arg> + <arg>-drJobName</arg> + <arg>${drJobName}-${nominalTime}</arg> + <arg>-executionStage</arg> + <arg>lastevents</arg> + </java> + <ok to="export-dr-replication"/> + <error to="failure"/> + </action> + <!-- Export Replication action --> + <action name="export-dr-replication" cred="hive_src_credentials,hive2_src_credentials"> + <java> + <job-tracker>${jobTracker}</job-tracker> + <name-node>${nameNode}</name-node> + <configuration> + <property> <!-- hadoop 2 parameter --> + <name>oozie.launcher.mapreduce.job.user.classpath.first</name> + <value>true</value> + </property> + <property> + <name>mapred.job.queue.name</name> + <value>${queueName}</value> + </property> + <property> + <name>oozie.launcher.mapred.job.priority</name> + <value>${jobPriority}</value> + </property> + <property> + <name>oozie.use.system.libpath</name> + <value>true</value> + </property> + <property> + <name>oozie.action.sharelib.for.java</name> + <value>distcp,hive,hive2,hcatalog</value> + </property> + <property> + <name>oozie.launcher.mapreduce.job.hdfs-servers</name> + <value>${sourceNN},${targetNN}</value> + </property> + <property> + <name>mapreduce.job.hdfs-servers</name> + <value>${sourceNN},${targetNN}</value> + </property> + </configuration> + <main-class>org.apache.falcon.hive.HiveDRTool</main-class> + <arg>-Dmapred.job.queue.name=${queueName}</arg> + <arg>-Dmapred.job.priority=${jobPriority}</arg> + <arg>-falconLibPath</arg> + <arg>${wf:conf("falcon.libpath")}</arg> + <arg>-replicationMaxMaps</arg> + <arg>${replicationMaxMaps}</arg> + <arg>-distcpMaxMaps</arg> + <arg>${distcpMaxMaps}</arg> + <arg>-sourceCluster</arg> + <arg>${sourceCluster}</arg> + <arg>-sourceMetastoreUri</arg> + <arg>${sourceMetastoreUri}</arg> + <arg>-sourceHiveServer2Uri</arg> + <arg>${sourceHiveServer2Uri}</arg> + <arg>-sourceDatabase</arg> + <arg>${sourceDatabase}</arg> + <arg>-sourceTable</arg> + <arg>${sourceTable}</arg> + <arg>-sourceStagingPath</arg> + <arg>${sourceStagingPath}</arg> + <arg>-sourceNN</arg> + <arg>${sourceNN}</arg> + <arg>-sourceNNKerberosPrincipal</arg> + <arg>${sourceNNKerberosPrincipal}</arg> + <arg>-sourceHiveMetastoreKerberosPrincipal</arg> + <arg>${sourceHiveMetastoreKerberosPrincipal}</arg> + <arg>-sourceHive2KerberosPrincipal</arg> + <arg>${sourceHive2KerberosPrincipal}</arg> + <arg>-targetCluster</arg> + <arg>${targetCluster}</arg> + <arg>-targetMetastoreUri</arg> + <arg>${targetMetastoreUri}</arg> + <arg>-targetHiveServer2Uri</arg> + <arg>${targetHiveServer2Uri}</arg> + <arg>-targetStagingPath</arg> + <arg>${targetStagingPath}</arg> + <arg>-targetNN</arg> + <arg>${targetNN}</arg> + <arg>-targetNNKerberosPrincipal</arg> + <arg>${targetNNKerberosPrincipal}</arg> + <arg>-targetHiveMetastoreKerberosPrincipal</arg> + <arg>${targetHiveMetastoreKerberosPrincipal}</arg> + <arg>-targetHive2KerberosPrincipal</arg> + <arg>${targetHive2KerberosPrincipal}</arg> + <arg>-maxEvents</arg> + <arg>${maxEvents}</arg> + <arg>-distcpMapBandwidth</arg> + <arg>${distcpMapBandwidth}</arg> + <arg>-clusterForJobRun</arg> + <arg>${clusterForJobRun}</arg> + <arg>-clusterForJobRunWriteEP</arg> + <arg>${clusterForJobRunWriteEP}</arg> + <arg>-clusterForJobNNKerberosPrincipal</arg> + <arg>${clusterForJobNNKerberosPrincipal}</arg> + <arg>-drJobName</arg> + <arg>${drJobName}-${nominalTime}</arg> + <arg>-executionStage</arg> + <arg>export</arg> + </java> + <ok to="import-dr-replication"/> + <error to="failure"/> + </action> + <!-- Import Replication action --> + <action name="import-dr-replication" cred="hive_tgt_credentials,hive2_tgt_credentials"> + <java> + <job-tracker>${jobTracker}</job-tracker> + <name-node>${nameNode}</name-node> + <configuration> + <property> <!-- hadoop 2 parameter --> + <name>oozie.launcher.mapreduce.job.user.classpath.first</name> + <value>true</value> + </property> + <property> + <name>mapred.job.queue.name</name> + <value>${queueName}</value> + </property> + <property> + <name>oozie.launcher.mapred.job.priority</name> + <value>${jobPriority}</value> + </property> + <property> + <name>oozie.use.system.libpath</name> + <value>true</value> + </property> + <property> + <name>oozie.action.sharelib.for.java</name> + <value>distcp,hive,hive2,hcatalog</value> + </property> + <property> + <name>oozie.launcher.mapreduce.job.hdfs-servers</name> + <value>${sourceNN},${targetNN}</value> + </property> + <property> + <name>mapreduce.job.hdfs-servers</name> + <value>${sourceNN},${targetNN}</value> + </property> + </configuration> + <main-class>org.apache.falcon.hive.HiveDRTool</main-class> + <arg>-Dmapred.job.queue.name=${queueName}</arg> + <arg>-Dmapred.job.priority=${jobPriority}</arg> + <arg>-falconLibPath</arg> + <arg>${wf:conf("falcon.libpath")}</arg> + <arg>-replicationMaxMaps</arg> + <arg>${replicationMaxMaps}</arg> + <arg>-distcpMaxMaps</arg> + <arg>${distcpMaxMaps}</arg> + <arg>-sourceCluster</arg> + <arg>${sourceCluster}</arg> + <arg>-sourceMetastoreUri</arg> + <arg>${sourceMetastoreUri}</arg> + <arg>-sourceHiveServer2Uri</arg> + <arg>${sourceHiveServer2Uri}</arg> + <arg>-sourceDatabase</arg> + <arg>${sourceDatabase}</arg> + <arg>-sourceTable</arg> + <arg>${sourceTable}</arg> + <arg>-sourceStagingPath</arg> + <arg>${sourceStagingPath}</arg> + <arg>-sourceNN</arg> + <arg>${sourceNN}</arg> + <arg>-sourceNNKerberosPrincipal</arg> + <arg>${sourceNNKerberosPrincipal}</arg> + <arg>-sourceHiveMetastoreKerberosPrincipal</arg> + <arg>${sourceHiveMetastoreKerberosPrincipal}</arg> + <arg>-sourceHive2KerberosPrincipal</arg> + <arg>${sourceHive2KerberosPrincipal}</arg> + <arg>-targetCluster</arg> + <arg>${targetCluster}</arg> + <arg>-targetMetastoreUri</arg> + <arg>${targetMetastoreUri}</arg> + <arg>-targetHiveServer2Uri</arg> + <arg>${targetHiveServer2Uri}</arg> + <arg>-targetStagingPath</arg> + <arg>${targetStagingPath}</arg> + <arg>-targetNN</arg> + <arg>${targetNN}</arg> + <arg>-targetNNKerberosPrincipal</arg> + <arg>${targetNNKerberosPrincipal}</arg> + <arg>-targetHiveMetastoreKerberosPrincipal</arg> + <arg>${targetHiveMetastoreKerberosPrincipal}</arg> + <arg>-targetHive2KerberosPrincipal</arg> + <arg>${targetHive2KerberosPrincipal}</arg> + <arg>-maxEvents</arg> + <arg>${maxEvents}</arg> + <arg>-distcpMapBandwidth</arg> + <arg>${distcpMapBandwidth}</arg> + <arg>-clusterForJobRun</arg> + <arg>${clusterForJobRun}</arg> + <arg>-clusterForJobRunWriteEP</arg> + <arg>${clusterForJobRunWriteEP}</arg> + <arg>-clusterForJobNNKerberosPrincipal</arg> + <arg>${clusterForJobNNKerberosPrincipal}</arg> + <arg>-drJobName</arg> + <arg>${drJobName}-${nominalTime}</arg> + <arg>-executionStage</arg> + <arg>import</arg> + </java> + <ok to="success"/> + <error to="failure"/> + </action> + <decision name="success"> + <switch> + <case to="successAlert"> + ${drNotificationReceivers ne 'NA'} + </case> + <default to="end"/> + </switch> + </decision> + <decision name="failure"> + <switch> + <case to="failureAlert"> + ${drNotificationReceivers ne 'NA'} + </case> + <default to="fail"/> + </switch> + </decision> + <action name="successAlert"> + <email xmlns="uri:oozie:email-action:0.2"> + <to>${drNotificationReceivers}</to> + <subject>INFO: Hive DR workflow ${drJobName} completed successfully</subject> + <body> + The Hive DR workflow ${wf:id()} is successful. + Source = ${sourceCluster} + Target = ${targetCluster} + DB Name = ${sourceDatabase} + Table Name = ${sourceTable} + </body> + </email> + <ok to="end"/> + <error to="end"/> + </action> + <action name="failureAlert"> + <email xmlns="uri:oozie:email-action:0.2"> + <to>${drNotificationReceivers}</to> + <subject>ERROR: Hive DR workflow ${drJobName} failed</subject> + <body> + The Hive DR workflow ${wf:id()} had issues and was killed. The error message is: ${wf:errorMessage(wf:lastErrorNode())} + Source = ${sourceCluster} + Target = ${targetCluster} + DB Name = ${sourceDatabase} + Table Name = ${sourceTable} + </body> + </email> + <ok to="end"/> + <error to="fail"/> + </action> + <kill name="fail"> + <message> + Workflow action failed, error message[${wf:errorMessage(wf:lastErrorNode())}] + </message> + </kill> + <end name="end"/> +</workflow-app> http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure.properties ---------------------------------------------------------------------- diff --git a/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure.properties b/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure.properties new file mode 100644 index 0000000..b2d670a --- /dev/null +++ b/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-secure.properties @@ -0,0 +1,104 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +##### NOTE: This is a TEMPLATE file which can be copied and edited + +##### Recipe properties +falcon.recipe.name=hive-disaster-recovery + + +##### Workflow properties +falcon.recipe.workflow.name=hive-dr-workflow +# Provide Wf absolute path. This can be HDFS or local FS path. If WF is on local FS it will be copied to HDFS +falcon.recipe.workflow.path=/recipes/hive-replication/hive-disaster-recovery-secure-workflow.xml + +##### Cluster properties + +# Change the cluster name where replication job should run here +falcon.recipe.cluster.name=backupCluster +# Change the cluster hdfs write end point here. This is mandatory. +falcon.recipe.cluster.hdfs.writeEndPoint=hdfs://localhost:8020 +# Change the cluster validity start time here +falcon.recipe.cluster.validity.start=2014-10-01T00:00Z +# Change the cluster validity end time here +falcon.recipe.cluster.validity.end=2016-12-30T00:00Z +# Change the cluster namenode kerberos principal. This is mandatory on secure clusters. +falcon.recipe.nn.principal=nn/[email protected] + +##### Scheduling properties + +# Change the process frequency here. Valid frequency type are minutes, hours, days, months +falcon.recipe.process.frequency=minutes(60) + +##### Retry policy properties + +falcon.recipe.retry.policy=periodic +falcon.recipe.retry.delay=minutes(30) +falcon.recipe.retry.attempts=3 + +##### Tag properties - An optional list of comma separated tags, Key Value Pairs, separated by comma +##### Uncomment to add tags +#falcon.recipe.tags=owner=landing,pipeline=adtech + +##### ACL properties - Uncomment and change ACL if authorization is enabled + +#falcon.recipe.acl.owner=testuser +#falcon.recipe.acl.group=group +#falcon.recipe.acl.permission=0x755 + +##### Custom Job properties + +##### Source Cluster DR properties +sourceCluster=primaryCluster +sourceMetastoreUri=thrift://localhost:9083 +sourceHiveServer2Uri=hive2://localhost:10000 +# For DB level replicaiton to replicate multiple databases specify comma separated list of tables +sourceDatabase=default +# For DB level replication specify * for sourceTable. +# For table level replication to replicate multiple tables specify comma separated list of tables +sourceTable=testtable_dr +sourceStagingPath=/apps/hive/tools/dr +sourceNN=hdfs://localhost:8020 +# Specify kerberos principal required to access source namenode and hive servers, optional on non-secure cluster. +sourceNNKerberosPrincipal=nn/[email protected] +sourceHiveMetastoreKerberosPrincipal=hive/[email protected] +sourceHive2KerberosPrincipal=hive/[email protected] + +##### Target Cluster DR properties +targetCluster=backupCluster +targetMetastoreUri=thrift://localhost:9083 +targetHiveServer2Uri=hive2://localhost:10000 +targetStagingPath=/apps/hive/tools/dr +targetNN=hdfs://localhost:8020 +# Specify kerberos principal required to access target namenode and hive servers, optional on non-secure cluster. +targetNNKerberosPrincipal=nn/[email protected] +targetHiveMetastoreKerberosPrincipal=hive/[email protected] +targetHive2KerberosPrincipal=hive/[email protected] + +# To ceil the max events processed each time job runs. Set it to max value depending on your bandwidth limit. +# Setting it to -1 will process all the events but can hog up the bandwidth. Use it judiciously! +maxEvents=-1 +# Change it to specify the maximum number of mappers for replication +replicationMaxMaps=5 +# Change it to specify the maximum number of mappers for DistCP +distcpMaxMaps=1 +# Change it to specify the bandwidth in MB for each mapper in DistCP +distcpMapBandwidth=100 + +##### Email on failure +drNotificationReceivers=NA \ No newline at end of file http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-template.xml ---------------------------------------------------------------------- diff --git a/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-template.xml b/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-template.xml new file mode 100644 index 0000000..3afbef0 --- /dev/null +++ b/addons/recipes/hive-disaster-recovery/src/main/resources/hive-disaster-recovery-template.xml @@ -0,0 +1,44 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + +<process name="##name##" xmlns="uri:falcon:process:0.1"> + <clusters> + <!-- source --> + <cluster name="##cluster.name##"> + <validity end="##cluster.validity.end##" start="##cluster.validity.start##"/> + </cluster> + </clusters> + + <tags>_falcon_mirroring_type=HIVE</tags> + + <parallel>1</parallel> + <!-- Replication needs to run only once to catch up --> + <order>LAST_ONLY</order> + <frequency>##process.frequency##</frequency> + <timezone>UTC</timezone> + + <properties> + <property name="oozie.wf.subworkflow.classpath.inheritance" value="true"/> + </properties> + + <workflow name="##workflow.name##" engine="oozie" + path="/apps/data-mirroring/workflows/hive-disaster-recovery-workflow.xml" lib="##workflow.lib.path##"/> + <retry policy="##retry.policy##" delay="##retry.delay##" attempts="3"/> + <ACL/> +</process>
