Repository: falcon Updated Branches: refs/heads/master af2ea867d -> 2d5516ec3
FALCON-1486 Add Unit Test cases for HiveDR. Contributed by Peeyush Bishnoi. Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/2d5516ec Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/2d5516ec Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/2d5516ec Branch: refs/heads/master Commit: 2d5516ec374333a7b39de7302c5cd511ba6ba9c8 Parents: af2ea86 Author: Ajay Yadava <[email protected]> Authored: Tue Nov 17 16:10:38 2015 +0530 Committer: Ajay Yadava <[email protected]> Committed: Tue Nov 17 16:10:38 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 2 + addons/hivedr/pom.xml | 12 + .../java/org/apache/falcon/hive/HiveDRTool.java | 9 +- .../falcon/hive/MetaStoreEventSourcer.java | 20 +- .../java/org/apache/falcon/hive/HiveDRTest.java | 252 +++++++++++++++++++ pom.xml | 1 + 6 files changed, 284 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/2d5516ec/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 612b3c9..18cf582 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -16,6 +16,8 @@ Trunk (Unreleased) FALCON-1213 Base framework of the native scheduler(Pallavi Rao) IMPROVEMENTS + FALCON-1486 Add Unit Test cases for HiveDR(Peeyush Bishnoi via Ajay Yadava) + FALCON-1592 Code Refactoring: Introduce Event type for scheduler events (Ajay Yadava via Pallavi Rao) FALCON-1593 Oozie setup failing in setup phase (Praveen Adlakha via Ajay Yadava) http://git-wip-us.apache.org/repos/asf/falcon/blob/2d5516ec/addons/hivedr/pom.xml ---------------------------------------------------------------------- diff --git a/addons/hivedr/pom.xml b/addons/hivedr/pom.xml index c887166..f98e8c4 100644 --- a/addons/hivedr/pom.xml +++ b/addons/hivedr/pom.xml @@ -183,6 +183,18 @@ </execution> </executions> </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <systemProperties> + <property> + <name>derby.stream.error.file</name> + <value>target/derby.log</value> + </property> + </systemProperties> + </configuration> + </plugin> </plugins> </build> http://git-wip-us.apache.org/repos/asf/falcon/blob/2d5516ec/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRTool.java ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRTool.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRTool.java index c3d9b5c..df16c40 100644 --- a/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRTool.java +++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRTool.java @@ -30,6 +30,7 @@ import org.apache.falcon.hive.util.EventSourcerUtils; import org.apache.falcon.hive.util.FileUtils; import org.apache.falcon.hive.util.HiveDRStatusStore; import org.apache.falcon.hive.util.HiveDRUtils; +import org.apache.falcon.hive.util.HiveMetastoreUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; @@ -43,6 +44,7 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; +import org.apache.hive.hcatalog.api.HCatClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -257,8 +259,11 @@ public class HiveDRTool extends Configured implements Tool { +inputOptions.getJobName()+".id"; Map<String, Long> lastEventsIdMap = getLastDBTableEvents(new Path(lastEventsIdFile)); try { - defaultSourcer = new MetaStoreEventSourcer(inputOptions.getSourceMetastoreUri(), - inputOptions.getSourceMetastoreKerberosPrincipal(), inputOptions.getSourceHive2KerberosPrincipal(), + HCatClient sourceMetastoreClient = HiveMetastoreUtils.initializeHiveMetaStoreClient( + inputOptions.getSourceMetastoreUri(), + inputOptions.getSourceMetastoreKerberosPrincipal(), + inputOptions.getSourceHive2KerberosPrincipal()); + defaultSourcer = new MetaStoreEventSourcer(sourceMetastoreClient, new DefaultPartitioner(drStore, eventSoucerUtil), eventSoucerUtil, lastEventsIdMap); inputFilename = defaultSourcer.sourceEvents(inputOptions); } finally { http://git-wip-us.apache.org/repos/asf/falcon/blob/2d5516ec/addons/hivedr/src/main/java/org/apache/falcon/hive/MetaStoreEventSourcer.java ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/MetaStoreEventSourcer.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/MetaStoreEventSourcer.java index 0e12e89..f008883 100644 --- a/addons/hivedr/src/main/java/org/apache/falcon/hive/MetaStoreEventSourcer.java +++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/MetaStoreEventSourcer.java @@ -21,7 +21,6 @@ package org.apache.falcon.hive; import org.apache.commons.lang3.StringUtils; import org.apache.falcon.hive.util.EventSourcerUtils; import org.apache.falcon.hive.util.HiveDRUtils; -import org.apache.falcon.hive.util.HiveMetastoreUtils; import org.apache.hive.hcatalog.api.HCatClient; import org.apache.hive.hcatalog.api.repl.ReplicationTask; import org.apache.hive.hcatalog.api.repl.StagingDirectoryProvider; @@ -49,14 +48,11 @@ public class MetaStoreEventSourcer implements EventSourcer { private long lastCounter; /* TODO handle cases when no events. files will be empty and lists will be empty */ - public MetaStoreEventSourcer(String sourceMetastoreUri, String sourceMetastoreKerberosPrincipal, - String sourceHive2KerberosPrincipal, Partitioner partitioner, - EventSourcerUtils eventSourcerUtils, Map<String, Long> lastEventsIdMap) - throws Exception { - - sourceMetastoreClient = HiveMetastoreUtils.initializeHiveMetaStoreClient(sourceMetastoreUri, - sourceMetastoreKerberosPrincipal, sourceHive2KerberosPrincipal); - eventMetadata = new ReplicationEventMetadata(); + public MetaStoreEventSourcer(HCatClient sourceMetastoreClient, Partitioner partitioner, + EventSourcerUtils eventSourcerUtils, + Map<String, Long> lastEventsIdMap) throws Exception { + this.sourceMetastoreClient = sourceMetastoreClient; + this.eventMetadata = new ReplicationEventMetadata(); this.partitioner = partitioner; this.eventSourcerUtils = eventSourcerUtils; this.lastEventsIdMap = lastEventsIdMap; @@ -149,7 +145,7 @@ public class MetaStoreEventSourcer implements EventSourcer { } - private void processTableReplicationEvents(Iterator<ReplicationTask> taskIter, String dbName, + protected void processTableReplicationEvents(Iterator<ReplicationTask> taskIter, String dbName, String tableName, String srcStagingDirProvider, String dstStagingDirProvider) throws Exception { String srcFilename = null; @@ -196,6 +192,10 @@ public class MetaStoreEventSourcer implements EventSourcer { EventSourcerUtils.updateEventMetadata(eventMetadata, dbName, tableName, srcFilename, tgtFilename); } + public String persistToMetaFile(String jobName) throws Exception { + return eventSourcerUtils.persistToMetaFile(eventMetadata, jobName); + } + public void cleanUp() throws Exception { if (sourceMetastoreClient != null) { sourceMetastoreClient.close(); http://git-wip-us.apache.org/repos/asf/falcon/blob/2d5516ec/addons/hivedr/src/test/java/org/apache/falcon/hive/HiveDRTest.java ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/test/java/org/apache/falcon/hive/HiveDRTest.java b/addons/hivedr/src/test/java/org/apache/falcon/hive/HiveDRTest.java new file mode 100644 index 0000000..cdeddaa --- /dev/null +++ b/addons/hivedr/src/test/java/org/apache/falcon/hive/HiveDRTest.java @@ -0,0 +1,252 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.hive; + +import com.google.common.base.Function; +import org.apache.commons.codec.binary.Base64; +import org.apache.commons.io.IOUtils; +import org.apache.falcon.cluster.util.EmbeddedCluster; +import org.apache.falcon.hadoop.JailedFileSystem; +import org.apache.falcon.hive.util.DRStatusStore; +import org.apache.falcon.hive.util.DelimiterUtils; +import org.apache.falcon.hive.util.EventSourcerUtils; +import org.apache.falcon.hive.util.HiveDRStatusStore; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.NotificationEvent; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hive.hcatalog.api.HCatClient; +import org.apache.hive.hcatalog.api.HCatNotificationEvent; +import org.apache.hive.hcatalog.api.repl.Command; +import org.apache.hive.hcatalog.api.repl.ReplicationTask; +import org.apache.hive.hcatalog.api.repl.ReplicationUtils; +import org.apache.hive.hcatalog.api.repl.StagingDirectoryProvider; +import org.apache.hive.hcatalog.common.HCatConstants; +import org.apache.hive.hcatalog.messaging.MessageFactory; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import javax.annotation.Nullable; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.ArrayList; +import java.util.List; + +/** + * Test for Hive DR export and import. + */ +public class HiveDRTest { + private FileSystem fileSystem; + private HCatClient client; + private MetaStoreEventSourcer sourcer; + private EmbeddedCluster cluster; + private String dbName = "testdb"; + private String tableName = "testtable"; + private StagingDirectoryProvider stagingDirectoryProvider; + private MessageFactory msgFactory = MessageFactory.getInstance(); + + @BeforeMethod + public void setup() throws Exception { + client = HCatClient.create(new HiveConf()); + initializeFileSystem(); + sourcer = new MetaStoreEventSourcer(client, null, new EventSourcerUtils(cluster.getConf(), + false, "hiveReplTest"), null); + stagingDirectoryProvider = new StagingDirectoryProvider.TrivialImpl("/tmp", "/"); + } + + private void initializeFileSystem() throws Exception { + cluster = EmbeddedCluster.newCluster("hivedr"); + fileSystem = new JailedFileSystem(); + Path storePath = new Path(DRStatusStore.BASE_DEFAULT_STORE_PATH); + fileSystem.initialize(LocalFileSystem.getDefaultUri(cluster.getConf()), cluster.getConf()); + if (fileSystem.exists(storePath)) { + fileSystem.delete(storePath, true); + } + FileSystem.mkdirs(fileSystem, storePath, DRStatusStore.DEFAULT_STORE_PERMISSION); + HiveDRStatusStore drStatusStore = new HiveDRStatusStore(fileSystem, + fileSystem.getFileStatus(storePath).getGroup()); + } + + // Dummy mapping used for all db and table name mappings + private Function<String, String> debugMapping = new Function<String, String>(){ + @Nullable + @Override + public String apply(@Nullable String s) { + if (s == null){ + return null; + } else { + StringBuilder sb = new StringBuilder(s); + return sb.toString() + sb.reverse().toString(); + } + } + }; + + @Test + public void testExportImportReplication() throws Exception { + Table t = new Table(); + t.setDbName(dbName); + t.setTableName(tableName); + NotificationEvent event = new NotificationEvent(getEventId(), getTime(), + HCatConstants.HCAT_CREATE_TABLE_EVENT, msgFactory.buildCreateTableMessage(t).toString()); + event.setDbName(t.getDbName()); + event.setTableName(t.getTableName()); + + HCatNotificationEvent hev = new HCatNotificationEvent(event); + ReplicationTask rtask = ReplicationTask.create(client, hev); + + Assert.assertEquals(hev.toString(), rtask.getEvent().toString()); + verifyExportImportReplicationTask(rtask); + } + + private void verifyExportImportReplicationTask(ReplicationTask rtask) throws Exception { + Assert.assertEquals(true, rtask.needsStagingDirs()); + Assert.assertEquals(false, rtask.isActionable()); + + rtask.withSrcStagingDirProvider(stagingDirectoryProvider) + .withDstStagingDirProvider(stagingDirectoryProvider) + .withDbNameMapping(debugMapping) + .withTableNameMapping(debugMapping); + + List<ReplicationTask> taskAdd = new ArrayList<ReplicationTask>(); + taskAdd.add(rtask); + sourcer.processTableReplicationEvents(taskAdd.iterator(), dbName, tableName, + stagingDirectoryProvider.toString(), stagingDirectoryProvider.toString()); + + String metaFileName = sourcer.persistToMetaFile("hiveReplTest"); + String event = readEventFile(new Path(metaFileName)); + Assert.assertEquals(event.split(DelimiterUtils.FIELD_DELIM).length, 4); + Assert.assertEquals(dbName, + new String(Base64.decodeBase64(event.split(DelimiterUtils.FIELD_DELIM)[0]), "UTF-8")); + Assert.assertEquals(tableName, + new String(Base64.decodeBase64(event.split(DelimiterUtils.FIELD_DELIM)[1]), "UTF-8")); + + String exportStr = readEventFile(new Path(event.split(DelimiterUtils.FIELD_DELIM)[2])); + String[] commandList = exportStr.split(DelimiterUtils.NEWLINE_DELIM); + for (String command : commandList) { + Command cmd = ReplicationUtils.deserializeCommand(command); + Assert.assertEquals(cmd.getEventId(), 42); + for(String stmt : cmd.get()) { + Assert.assertTrue(stmt.startsWith("EXPORT TABLE")); + } + } + + String importStr = readEventFile(new Path(event.split(DelimiterUtils.FIELD_DELIM)[3])); + commandList = importStr.split(DelimiterUtils.NEWLINE_DELIM); + for (String command : commandList) { + Command cmd = ReplicationUtils.deserializeCommand(command); + Assert.assertEquals(cmd.getEventId(), 42); + for (String stmt : cmd.get()) { + Assert.assertTrue(stmt.startsWith("IMPORT TABLE")); + } + } + } + + @Test + public void testImportReplication() throws Exception { + Table t = new Table(); + t.setDbName("testdb"); + t.setTableName("testtable"); + NotificationEvent event = new NotificationEvent(getEventId(), getTime(), + HCatConstants.HCAT_DROP_TABLE_EVENT, msgFactory.buildDropTableMessage(t).toString()); + event.setDbName(t.getDbName()); + event.setTableName(t.getTableName()); + + HCatNotificationEvent hev = new HCatNotificationEvent(event); + ReplicationTask rtask = ReplicationTask.create(client, hev); + + Assert.assertEquals(hev.toString(), rtask.getEvent().toString()); + verifyImportReplicationTask(rtask); + } + + private void verifyImportReplicationTask(ReplicationTask rtask) throws Exception { + Assert.assertEquals(false, rtask.needsStagingDirs()); + Assert.assertEquals(true, rtask.isActionable()); + rtask.withDbNameMapping(debugMapping) + .withTableNameMapping(debugMapping); + + List<ReplicationTask> taskAdd = new ArrayList<ReplicationTask>(); + taskAdd.add(rtask); + sourcer.processTableReplicationEvents(taskAdd.iterator(), dbName, tableName, + stagingDirectoryProvider.toString(), stagingDirectoryProvider.toString()); + String persistFileName = sourcer.persistToMetaFile("hiveReplTest"); + String event = readEventFile(new Path(persistFileName)); + + Assert.assertEquals(event.split(DelimiterUtils.FIELD_DELIM).length, 4); + Assert.assertEquals(dbName, + new String(Base64.decodeBase64(event.split(DelimiterUtils.FIELD_DELIM)[0]), "UTF-8")); + Assert.assertEquals(tableName, + new String(Base64.decodeBase64(event.split(DelimiterUtils.FIELD_DELIM)[1]), "UTF-8")); + + String exportStr = readEventFile(new Path(event.split(DelimiterUtils.FIELD_DELIM)[2])); + String[] commandList = exportStr.split(DelimiterUtils.NEWLINE_DELIM); + for (String command : commandList) { + Command cmd = ReplicationUtils.deserializeCommand(command); + Assert.assertEquals(cmd.getEventId(), 42); + Assert.assertEquals(cmd.get().size(), 0); //In case of drop size of export is 0. Metadata operation + } + + String importStr = readEventFile(new Path(event.split(DelimiterUtils.FIELD_DELIM)[3])); + commandList = importStr.split(DelimiterUtils.NEWLINE_DELIM); + for (String command : commandList) { + Command cmd = ReplicationUtils.deserializeCommand(command); + Assert.assertEquals(cmd.getEventId(), 42); + for (String stmt : cmd.get()) { + Assert.assertTrue(stmt.startsWith("DROP TABLE")); + } + } + } + + private long getEventId() { + // Does not need to be unique, just non-zero distinct value to test against. + return 42; + } + + private int getTime() { + // Does not need to be actual time, just non-zero distinct value to test against. + return 1729; + } + + private String readEventFile(Path eventFileName) throws IOException { + StringBuilder eventString = new StringBuilder(); + BufferedReader in = new BufferedReader(new InputStreamReader( + fileSystem.open(eventFileName))); + try { + String line; + while ((line=in.readLine())!=null) { + eventString.append(line); + } + } catch (Exception e) { + throw new IOException(e); + } finally { + IOUtils.closeQuietly(in); + } + return eventString.toString(); + } + + @AfterMethod + public void tearDown() throws Exception { + client.close(); + } + +} http://git-wip-us.apache.org/repos/asf/falcon/blob/2d5516ec/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 6f2c480..dfaf1c1 100644 --- a/pom.xml +++ b/pom.xml @@ -307,6 +307,7 @@ <exclude>**/maven-eclipse.xml</exclude> <exclude>**/.externalToolBuilders/**</exclude> <exclude>html5-ui/**</exclude> + <exclude>**/metastore_db/**</exclude> <exclude>**/db1.log</exclude> <exclude>**/db1.properties</exclude> <exclude>**/db1.script</exclude>
