FALCON-1188 Falcon support for Hive Replication. Contributed by Venkat Ranganathan.
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/cc1d3840 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/cc1d3840 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/cc1d3840 Branch: refs/heads/0.7 Commit: cc1d3840dd2184f5a2ac5452115d370aab80997e Parents: 7f6cc6d Author: Ajay Yadava <[email protected]> Authored: Tue Aug 11 13:49:02 2015 +0530 Committer: Ajay Yadava <[email protected]> Committed: Thu Aug 13 14:37:02 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 2 + Installation-steps.txt | 4 + addons/hivedr/README | 80 ++++ addons/hivedr/pom.xml | 207 ++++++++++ .../apache/falcon/hive/DefaultPartitioner.java | 317 +++++++++++++++ .../org/apache/falcon/hive/EventSourcer.java | 31 ++ .../java/org/apache/falcon/hive/HiveDRArgs.java | 121 ++++++ .../org/apache/falcon/hive/HiveDROptions.java | 175 ++++++++ .../java/org/apache/falcon/hive/HiveDRTool.java | 378 +++++++++++++++++ .../falcon/hive/LastReplicatedEvents.java | 189 +++++++++ .../falcon/hive/MetaStoreEventSourcer.java | 202 ++++++++++ .../org/apache/falcon/hive/Partitioner.java | 42 ++ .../falcon/hive/ReplicationEventMetadata.java | 34 ++ .../exception/HiveReplicationException.java | 49 +++ .../falcon/hive/mapreduce/CopyCommitter.java | 65 +++ .../falcon/hive/mapreduce/CopyMapper.java | 93 +++++ .../falcon/hive/mapreduce/CopyReducer.java | 85 ++++ .../falcon/hive/util/DBReplicationStatus.java | 213 ++++++++++ .../apache/falcon/hive/util/DRStatusStore.java | 104 +++++ .../apache/falcon/hive/util/DelimiterUtils.java | 30 ++ .../falcon/hive/util/EventSourcerUtils.java | 189 +++++++++ .../org/apache/falcon/hive/util/EventUtils.java | 361 +++++++++++++++++ .../org/apache/falcon/hive/util/FileUtils.java | 68 ++++ .../falcon/hive/util/HiveDRStatusStore.java | 315 +++++++++++++++ .../apache/falcon/hive/util/HiveDRUtils.java | 86 ++++ .../falcon/hive/util/HiveMetastoreUtils.java | 92 +++++ .../falcon/hive/util/ReplicationStatus.java | 221 ++++++++++ addons/hivedr/src/main/resources/log4j.xml | 54 +++ .../falcon/hive/DBReplicationStatusTest.java | 230 +++++++++++ .../java/org/apache/falcon/hive/DRTest.java | 45 +++ .../falcon/hive/HiveDRStatusStoreTest.java | 346 ++++++++++++++++ .../falcon/hive/ReplicationStatusTest.java | 137 +++++++ .../resources/hdfs-replication-template.xml | 23 +- .../resources/hdfs-replication-workflow.xml | 60 ++- .../main/resources/hdfs-replication.properties | 68 ++-- .../recipes/hive-disaster-recovery/README.txt | 58 +++ addons/recipes/hive-disaster-recovery/pom.xml | 32 ++ .../hive-disaster-recovery-secure-template.xml | 44 ++ .../hive-disaster-recovery-secure-workflow.xml | 401 +++++++++++++++++++ .../hive-disaster-recovery-secure.properties | 104 +++++ .../hive-disaster-recovery-template.xml | 44 ++ .../hive-disaster-recovery-workflow.xml | 293 ++++++++++++++ .../resources/hive-disaster-recovery.properties | 94 +++++ client/pom.xml | 6 + .../java/org/apache/falcon/cli/FalconCLI.java | 28 +- .../org/apache/falcon/client/FalconClient.java | 4 +- .../recipe/HdfsReplicationRecipeTool.java | 70 ++++ .../HdfsReplicationRecipeToolOptions.java | 62 +++ .../recipe/HiveReplicationRecipeTool.java | 196 +++++++++ .../HiveReplicationRecipeToolOptions.java | 89 ++++ .../java/org/apache/falcon/recipe/Recipe.java | 29 ++ .../org/apache/falcon/recipe/RecipeFactory.java | 44 ++ .../org/apache/falcon/recipe/RecipeTool.java | 144 +++---- .../apache/falcon/recipe/RecipeToolArgs.java | 5 +- .../apache/falcon/recipe/RecipeToolOptions.java | 30 +- .../recipe/util/RecipeProcessBuilderUtils.java | 272 +++++++++++++ docs/src/site/twiki/InstallationSteps.twiki | 2 + .../service/SharedLibraryHostingService.java | 16 +- pom.xml | 120 ++++-- replication/pom.xml | 10 +- src/main/assemblies/distributed-package.xml | 15 + src/main/assemblies/standalone-package.xml | 16 + test-tools/hadoop-webapp/pom.xml | 23 ++ webapp/pom.xml | 14 +- .../java/org/apache/falcon/cli/FalconCLIIT.java | 195 +++++---- .../resources/hdfs-replication-template.xml | 43 ++ .../test/resources/hdfs-replication.properties | 45 +++ webapp/src/test/resources/process.properties | 25 -- 68 files changed, 6986 insertions(+), 303 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index a6dc95b..fc7ee03 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -4,6 +4,8 @@ Trunk (Unreleased) INCOMPATIBLE CHANGES NEW FEATURES + FALCON-1188 Falcon support for Hive Replication(Venkat Ranganathan via Ajay Yadava) + FALCON-1297 Falcon Unit which supports Submit and Schedule of jobs(Pavan Kumar Kolamuri via Ajay Yadava) FALCON-1039 Add instance dependency API in falcon (Ajay Yadava) http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/Installation-steps.txt ---------------------------------------------------------------------- diff --git a/Installation-steps.txt b/Installation-steps.txt index bb92a85..b86d6a1 100644 --- a/Installation-steps.txt +++ b/Installation-steps.txt @@ -43,6 +43,8 @@ a. Building falcon from the source release [optionally -Dhadoop.version=<<hadoop.version>> can be appended to build for a specific version of hadoop] *Note:* Falcon drops support for Hadoop-1 and only supports Hadoop-2 from Falcon 0.6 onwards Falcon build with JDK 1.7 using -noverify option + To compile Falcon with Hive Replication, optionally "-P hadoop-2,hivedr" can be appended. For this + Hive >= 1.2.0 and Oozie >= 4.2.0 should be available. b. Building falcon from the source repository @@ -55,6 +57,8 @@ b. Building falcon from the source repository [optionally -Dhadoop.version=<<hadoop.version>> can be appended to build for a specific version of hadoop] *Note:* Falcon drops support for Hadoop-1 and only supports Hadoop-2 from Falcon 0.6 onwards Falcon build with JDK 1.7 using -noverify option + To compile Falcon with Hive Replication, optionally "-P hadoop-2,hivedr" can be appended. For this + Hive >= 1.2.0 and Oozie >= 4.2.0 should be available. 2. Deploying Falcon http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/addons/hivedr/README ---------------------------------------------------------------------- diff --git a/addons/hivedr/README b/addons/hivedr/README new file mode 100644 index 0000000..0b448d3 --- /dev/null +++ b/addons/hivedr/README @@ -0,0 +1,80 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +Hive Disaster Recovery +======================= + +Overview +--------- + +Falcon provides feature to replicate Hive metadata and data events from one hadoop cluster +to another cluster. This is supported for secure and unsecure cluster through Falcon Recipes. + + +Prerequisites +------------- + +Following is the prerequisites to use Hive DR + +* Hive 1.2.0+ +* Oozie 4.2.0+ + +*Note:* Set following properties in hive-site.xml for replicating the Hive events: + <property> + <name>hive.metastore.event.listeners</name> + <value>org.apache.hive.hcatalog.listener.DbNotificationListener</value> + <description>event listeners that are notified of any metastore changes</description> + </property> + + <property> + <name>hive.metastore.dml.events</name> + <value>true</value> + </property> + + +Usage +------ +a. Perform initial bootstrap of Table and Database from one Hadoop cluster to another Hadoop cluster + + Table Bootstrap + ---------------- + For bootstrapping table replication, essentially after having turned on the DbNotificationListener + on the source db, we should do an EXPORT of the table, distcp the export over to the destination + warehouse, and do an IMPORT over there. Check following Hive Export-Import link for syntax details + and examples. + + This will set up the destination table so that the events on the source cluster that modify the table + will then be replicated over. + + Database Bootstrap + ------------------ + For bootstrapping DB replication, first destination DB should be created. This step is expected, + since DB replication definitions can be set up by users only on pre-existing DBâs. Second, we need + to export all tables in the source db and import them in the destination db, as described above. + + +b. Setup cluster definition + $FALCON_HOME/bin/falcon entity -submit -type cluster -file /cluster/definition.xml + +c. Submit Hive DR recipe + $FALCON_HOME/bin/falcon recipe -name hive-disaster-recovery -operation HIVE_DISASTER_RECOVERY + + +Recipe templates for Hive DR is available in addons/recipe/hive-disaster-recovery and copy it to +recipe path specified in client.properties. + +*Note:* If kerberos security is enabled on cluster, use the secure templates for Hive DR from + addons/recipe/hive-disaster-recovery \ No newline at end of file http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/addons/hivedr/pom.xml ---------------------------------------------------------------------- diff --git a/addons/hivedr/pom.xml b/addons/hivedr/pom.xml new file mode 100644 index 0000000..29780ad --- /dev/null +++ b/addons/hivedr/pom.xml @@ -0,0 +1,207 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.falcon</groupId> + <artifactId>falcon-main</artifactId> + <version>0.7-SNAPSHOT</version> + <relativePath>../../pom.xml</relativePath> + </parent> + <artifactId>falcon-hive-replication</artifactId> + <description>Apache Falcon Hive Replication Module</description> + <name>Apache Falcon Hive Replication</name> + <packaging>jar</packaging> + + <dependencies> + <!-- dependencies are always listed in sorted order by groupId, artifactId --> + <!-- intra-project --> + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-common</artifactId> + <version>${hive.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-exec</artifactId> + <version>${hive.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-metastore</artifactId> + <version>${hive.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-shims</artifactId> + <version>${hive.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hive.hcatalog</groupId> + <artifactId>hive-webhcat-java-client</artifactId> + <version>${hive.version}</version> + </dependency> + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-jdbc</artifactId> + <version>${hive.version}</version> + <classifier>standalone</classifier> + </dependency> + <dependency> + <groupId>org.testng</groupId> + <artifactId>testng</artifactId> + </dependency> + <!-- inter-project --> + <dependency> + <groupId>javax.jms</groupId> + <artifactId>jms</artifactId> + </dependency> + <!-- test intra-project --> + <dependency> + <groupId>org.apache.hive</groupId> + <artifactId>hive-cli</artifactId> + <version>${hive.version}</version> + <scope>test</scope> + </dependency> + <!-- test inter-project --> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>3.8.1</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.activemq</groupId> + <artifactId>activemq-core</artifactId> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>org.springframework</groupId> + <artifactId>spring-context</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.activemq</groupId> + <artifactId>kahadb</artifactId> + <version>${activemq.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.falcon</groupId> + <artifactId>falcon-test-util</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.falcon</groupId> + <artifactId>falcon-hadoop-dependencies</artifactId> + <scope>test</scope> + </dependency> + + </dependencies> + + <profiles> + <profile> + <id>hadoop-2</id> + <activation> + <activeByDefault>true</activeByDefault> + </activation> + <dependencies> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <scope>compile</scope> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + <scope>compile</scope> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-core</artifactId> + <scope>compile</scope> + <version>${hadoop.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + <scope>compile</scope> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-common</artifactId> + <scope>compile</scope> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-server-resourcemanager</artifactId> + <scope>compile</scope> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-server-nodemanager</artifactId> + <scope>compile</scope> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-auth</artifactId> + <scope>compile</scope> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-distcp</artifactId> + <scope>compile</scope> + </dependency> + </dependencies> + </profile> + </profiles> + + <build> + <sourceDirectory>${basedir}/src/main/java</sourceDirectory> + <testSourceDirectory>${basedir}/src/test/java</testSourceDirectory> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <executions> + <execution> + <goals> + <goal>test-jar</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> + +</project> http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/addons/hivedr/src/main/java/org/apache/falcon/hive/DefaultPartitioner.java ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/DefaultPartitioner.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/DefaultPartitioner.java new file mode 100644 index 0000000..ce4bfab --- /dev/null +++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/DefaultPartitioner.java @@ -0,0 +1,317 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.hive; + +import com.google.common.collect.Lists; +import org.apache.commons.lang3.StringUtils; +import org.apache.falcon.hive.util.DRStatusStore; +import org.apache.falcon.hive.util.EventSourcerUtils; +import org.apache.falcon.hive.util.HiveDRUtils; +import org.apache.falcon.hive.util.ReplicationStatus; +import org.apache.hive.hcatalog.api.repl.Command; +import org.apache.hive.hcatalog.api.repl.ReplicationTask; +import org.apache.hive.hcatalog.api.repl.StagingDirectoryProvider; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.OutputStream; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import static org.apache.hive.hcatalog.api.HCatNotificationEvent.Scope; + +/** + * Partitioner for partitioning events for a given DB. + */ +public class DefaultPartitioner implements Partitioner { + + private static final Logger LOG = LoggerFactory.getLogger(DefaultPartitioner.class); + private EventFilter eventFilter; + private final DRStatusStore drStore; + private final EventSourcerUtils eventSourcerUtils; + + private enum CMDTYPE { + SRC_CMD_TYPE, + TGT_CMD_TYPE + } + + public DefaultPartitioner(DRStatusStore drStore, EventSourcerUtils eventSourcerUtils) { + this.drStore = drStore; + this.eventSourcerUtils = eventSourcerUtils; + } + + private class EventFilter { + private final Map<String, Long> eventFilterMap; + + public EventFilter(String sourceMetastoreUri, String targetMetastoreUri, String jobName, + String database) throws Exception { + eventFilterMap = new HashMap<>(); + Iterator<ReplicationStatus> replStatusIter = drStore.getTableReplicationStatusesInDb(sourceMetastoreUri, + targetMetastoreUri, jobName, database); + while (replStatusIter.hasNext()) { + ReplicationStatus replStatus = replStatusIter.next(); + eventFilterMap.put(replStatus.getTable(), replStatus.getEventId()); + } + } + } + + public ReplicationEventMetadata partition(final HiveDROptions drOptions, final String databaseName, + final Iterator<ReplicationTask> taskIter) throws Exception { + long lastCounter = 0; + String dbName = databaseName.toLowerCase(); + // init filtering before partitioning + this.eventFilter = new EventFilter(drOptions.getSourceMetastoreUri(), drOptions.getTargetMetastoreUri(), + drOptions.getJobName(), dbName); + String srcStagingDirProvider = drOptions.getSourceStagingPath(); + String dstStagingDirProvider = drOptions.getTargetStagingPath(); + + List<Command> dbSrcEventList = Lists.newArrayList(); + List<Command> dbTgtEventList = Lists.newArrayList(); + + Map<String, List<String>> eventMetaFileMap = + new HashMap<>(); + Map<String, List<OutputStream>> outputStreamMap = + new HashMap<>(); + + + String srcFilename = null; + String tgtFilename = null; + OutputStream srcOutputStream = null; + OutputStream tgtOutputStream = null; + + while (taskIter.hasNext()) { + ReplicationTask task = taskIter.next(); + if (task.needsStagingDirs()) { + task.withSrcStagingDirProvider(new StagingDirectoryProvider.TrivialImpl(srcStagingDirProvider, + HiveDRUtils.SEPARATOR)); + task.withDstStagingDirProvider(new StagingDirectoryProvider.TrivialImpl(dstStagingDirProvider, + HiveDRUtils.SEPARATOR)); + } + + if (task.isActionable()) { + Scope eventScope = task.getEvent().getEventScope(); + String tableName = task.getEvent().getTableName(); + if (StringUtils.isNotEmpty(tableName)) { + tableName = tableName.toLowerCase(); + } + + boolean firstEventForTable = (eventScope == Scope.TABLE) + && isFirstEventForTable(eventMetaFileMap, tableName); + if (firstEventForTable && (task.getSrcWhCommands() != null || task.getDstWhCommands() != null)) { + ++lastCounter; + } + Iterable<? extends org.apache.hive.hcatalog.api.repl.Command> srcCmds = task.getSrcWhCommands(); + if (srcCmds != null) { + if (eventScope == Scope.DB) { + processDBScopeCommands(dbSrcEventList, srcCmds, outputStreamMap, CMDTYPE.SRC_CMD_TYPE); + } else if (eventScope == Scope.TABLE) { + OutputStream srcOut; + if (firstEventForTable) { + srcFilename = eventSourcerUtils.getSrcFileName(String.valueOf(lastCounter)).toString(); + srcOutputStream = eventSourcerUtils.getFileOutputStream(srcFilename); + srcOut = srcOutputStream; + } else { + srcOut = outputStreamMap.get(tableName).get(0); + } + processTableScopeCommands(srcCmds, eventMetaFileMap, tableName, dbSrcEventList, srcOut); + } else { + throw new Exception("Event scope is not DB or Table"); + } + } + + + Iterable<? extends org.apache.hive.hcatalog.api.repl.Command> dstCmds = task.getDstWhCommands(); + if (dstCmds != null) { + if (eventScope == Scope.DB) { + processDBScopeCommands(dbTgtEventList, dstCmds, outputStreamMap, CMDTYPE.TGT_CMD_TYPE); + } else if (eventScope == Scope.TABLE) { + OutputStream tgtOut; + if (firstEventForTable) { + tgtFilename = eventSourcerUtils.getTargetFileName(String.valueOf(lastCounter)).toString(); + tgtOutputStream = eventSourcerUtils.getFileOutputStream(tgtFilename); + tgtOut = tgtOutputStream; + } else { + tgtOut = outputStreamMap.get(tableName).get(1); + } + processTableScopeCommands(dstCmds, eventMetaFileMap, tableName, dbTgtEventList, tgtOut); + } else { + throw new Exception("Event scope is not DB or Table"); + } + } + + // If first table event, update the state data at the end + if (firstEventForTable) { + updateStateDataIfFirstTableEvent(tableName, srcFilename, tgtFilename, srcOutputStream, + tgtOutputStream, eventMetaFileMap, outputStreamMap); + } + } else { + LOG.error("Task is not actionable with event Id : {}", task.getEvent().getEventId()); + } + } + + ReplicationEventMetadata eventMetadata = new ReplicationEventMetadata(); + // If there were only DB events for this run + if (eventMetaFileMap.isEmpty()) { + ++lastCounter; + if (!dbSrcEventList.isEmpty()) { + srcFilename = eventSourcerUtils.getSrcFileName(String.valueOf(lastCounter)).toString(); + srcOutputStream = eventSourcerUtils.getFileOutputStream(srcFilename); + eventSourcerUtils.persistReplicationEvents(srcOutputStream, dbSrcEventList); + } + + if (!dbTgtEventList.isEmpty()) { + tgtFilename = eventSourcerUtils.getTargetFileName(String.valueOf(lastCounter)).toString(); + tgtOutputStream = eventSourcerUtils.getFileOutputStream(tgtFilename); + eventSourcerUtils.persistReplicationEvents(tgtOutputStream, dbTgtEventList); + } + + // Close the stream + eventSourcerUtils.closeOutputStream(srcOutputStream); + eventSourcerUtils.closeOutputStream(tgtOutputStream); + EventSourcerUtils.updateEventMetadata(eventMetadata, dbName, null, srcFilename, tgtFilename); + } else { + closeAllStreams(outputStreamMap); + for (Map.Entry<String, List<String>> entry : eventMetaFileMap.entrySet()) { + String srcFile = null; + String tgtFile = null; + if (entry.getValue() != null) { + srcFile = entry.getValue().get(0); + tgtFile = entry.getValue().get(1); + } + EventSourcerUtils.updateEventMetadata(eventMetadata, dbName, entry.getKey(), srcFile, tgtFile); + } + } + + return eventMetadata; + } + + private void updateStateDataIfFirstTableEvent(final String tableName, final String srcFilename, + final String tgtFilename, + final OutputStream srcOutputStream, + final OutputStream tgtOutputStream, + Map<String, List<String>> eventMetaFileMap, + Map<String, List<OutputStream>> outputStreamMap) { + List<String> files = Arrays.asList(srcFilename, tgtFilename); + eventMetaFileMap.put(tableName, files); + + List<OutputStream> streams = Arrays.asList(srcOutputStream, tgtOutputStream); + outputStreamMap.put(tableName, streams); + } + + private void closeAllStreams(final Map<String, List<OutputStream>> outputStreamMap) throws Exception { + if (outputStreamMap == null || outputStreamMap.isEmpty()) { + return; + } + + for (Map.Entry<String, List<OutputStream>> entry : outputStreamMap.entrySet()) { + List<OutputStream> streams = entry.getValue(); + + for (OutputStream out : streams) { + if (out != null) { + eventSourcerUtils.closeOutputStream(out); + } + } + } + } + + private void processDBScopeCommands(final List<Command> dbEventList, final Iterable<? extends org.apache.hive + .hcatalog.api.repl.Command> cmds, final Map<String, List<OutputStream>> outputStreamMap, CMDTYPE cmdType + ) throws Exception { + addCmdsToDBEventList(dbEventList, cmds); + + /* add DB event to all tables */ + if (!outputStreamMap.isEmpty()) { + addDbEventToAllTablesEventFile(cmds, outputStreamMap, cmdType); + } + } + + private void processTableScopeCommands(final Iterable<? extends org.apache.hive.hcatalog.api.repl.Command> cmds, + final Map<String, + List<String>> eventMetaFileMap, String tableName, + final List<Command> dbEventList, final OutputStream out) throws Exception { + // First event for this table + // Before adding this event, add all the DB events + if (isFirstEventForTable(eventMetaFileMap, tableName)) { + addDbEventsToTableEventFile(out, dbEventList, tableName); + } + addTableEventToFile(out, cmds, tableName); + } + + private boolean isFirstEventForTable(final Map<String, + List<String>> eventMetaFileMap, final String tableName) { + List<String> files = eventMetaFileMap.get(tableName); + return (files == null || files.isEmpty()); + } + + private void addCmdsToDBEventList(List<Command> dbEventList, final java.lang.Iterable + <? extends org.apache.hive.hcatalog.api.repl.Command> cmds) { + for (Command cmd : cmds) { + dbEventList.add(cmd); + } + } + + private void addDbEventToAllTablesEventFile( + final java.lang.Iterable<? extends org.apache.hive.hcatalog.api.repl.Command> cmds, + final Map<String, List<OutputStream>> outputStreamMap, final CMDTYPE cmdType) throws Exception { + for (Map.Entry<String, List<OutputStream>> entry : outputStreamMap.entrySet()) { + String tableName = entry.getKey(); + List<OutputStream> streams = entry.getValue(); + OutputStream out; + if (CMDTYPE.SRC_CMD_TYPE == cmdType) { + out = streams.get(0); + } else { + out = streams.get(1); + } + addTableEventToFile(out, cmds, tableName); + } + } + + private void addDbEventsToTableEventFile(final OutputStream out, final List<Command> dbEventList, + final String tableName) throws Exception { + /* First event for the table, add db events before adding this event */ + addTableEventToFile(out, dbEventList, tableName); + } + + private void addTableEventToFile(final OutputStream out, + final java.lang.Iterable<? extends org.apache.hive.hcatalog.api.repl.Command> cmds, + final String tableName) throws Exception { + Long eventId = eventFilter.eventFilterMap.get(tableName); + /* If not already processed, add it */ + for (Command cmd : cmds) { + persistEvent(out, eventId, cmd); + } + } + + private void persistEvent(final OutputStream out, final Long eventId, final Command cmd) throws Exception { + if (out == null) { + LOG.debug("persistEvent : out is null"); + return; + } + if (eventId == null || cmd.getEventId() > eventId) { + eventSourcerUtils.persistReplicationEvents(out, cmd); + } + } + + public boolean isPartitioningRequired(final HiveDROptions options) { + return (HiveDRUtils.getReplicationType(options.getSourceTables()) == HiveDRUtils.ReplicationType.DB); + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/addons/hivedr/src/main/java/org/apache/falcon/hive/EventSourcer.java ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/EventSourcer.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/EventSourcer.java new file mode 100644 index 0000000..5f3312c --- /dev/null +++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/EventSourcer.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.hive; + +/** + * Source events for each table into a file. + */ +public interface EventSourcer { + /** + * @param inputOptions + * @return input filename to mapper + */ + /* Source events for each <db, table> into a file */ + String sourceEvents(HiveDROptions inputOptions) throws Exception; +} http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRArgs.java ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRArgs.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRArgs.java new file mode 100644 index 0000000..1ad6a62 --- /dev/null +++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRArgs.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.hive; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; + +/** + * Arguments for workflow execution. + */ +public enum HiveDRArgs { + + // source meta store details + SOURCE_CLUSTER("sourceCluster", "source cluster"), + SOURCE_METASTORE_URI("sourceMetastoreUri", "source meta store uri"), + SOURCE_HS2_URI("sourceHiveServer2Uri", "source HS2 uri"), + SOURCE_DATABASE("sourceDatabase", "comma source databases"), + SOURCE_TABLE("sourceTable", "comma source tables"), + SOURCE_STAGING_PATH("sourceStagingPath", "source staging path for data"), + + // source hadoop endpoints + SOURCE_NN("sourceNN", "source name node"), + // source security kerberos principals + SOURCE_NN_KERBEROS_PRINCIPAL("sourceNNKerberosPrincipal", "Source name node kerberos principal", false), + SOURCE_HIVE_METASTORE_KERBEROS_PRINCIPAL("sourceHiveMetastoreKerberosPrincipal", + "Source hive metastore kerberos principal", false), + SOURCE_HIVE2_KERBEROS_PRINCIPAL("sourceHive2KerberosPrincipal", "Source hiveserver2 kerberos principal", false), + + TARGET_CLUSTER("targetCluster", "target cluster"), + // target meta store details + TARGET_METASTORE_URI("targetMetastoreUri", "source meta store uri"), + TARGET_HS2_URI("targetHiveServer2Uri", "source meta store uri"), + + TARGET_STAGING_PATH("targetStagingPath", "source staging path for data"), + + // target hadoop endpoints + TARGET_NN("targetNN", "target name node"), + // target security kerberos principals + TARGET_NN_KERBEROS_PRINCIPAL("targetNNKerberosPrincipal", "Target name node kerberos principal", false), + TARGET_HIVE_METASTORE_KERBEROS_PRINCIPAL("targetHiveMetastoreKerberosPrincipal", + "Target hive metastore kerberos principal", false), + TARGET_HIVE2_KERBEROS_PRINCIPAL("targetHive2KerberosPrincipal", "Target hiveserver2 kerberos principal", false), + + CLUSTER_FOR_JOB_NN_KERBEROS_PRINCIPAL("clusterForJobNNKerberosPrincipal", + "Namenode kerberos principal of cluster on which replication job runs", false), + + // num events + MAX_EVENTS("maxEvents", "number of events to process in this run"), + + // tuning params + REPLICATION_MAX_MAPS("replicationMaxMaps", "number of maps", false), + DISTCP_MAX_MAPS("distcpMaxMaps", "number of maps", false), + + // Map Bandwidth + DISTCP_MAP_BANDWIDTH("distcpMapBandwidth", "map bandwidth in mb", false), + + JOB_NAME("drJobName", "unique job name"), + + CLUSTER_FOR_JOB_RUN("clusterForJobRun", "cluster where job runs"), + CLUSTER_FOR_JOB_RUN_WRITE_EP("clusterForJobRunWriteEP", "cluster where job runs write EP"), + + FALCON_LIBPATH("falconLibPath", "Falcon Lib Path for Jar files", false), + + KEEP_HISTORY("keepHistory", "Keep history of events file generated", false), + EXECUTION_STAGE("executionStage", "Flag for workflow stage execution", false); + + private final String name; + private final String description; + private final boolean isRequired; + + HiveDRArgs(String name, String description) { + this(name, description, true); + } + + HiveDRArgs(String name, String description, boolean isRequired) { + this.name = name; + this.description = description; + this.isRequired = isRequired; + } + + public Option getOption() { + return new Option(this.name, true, this.description); + } + + public String getName() { + return this.name; + } + + public String getDescription() { + return description; + } + + public boolean isRequired() { + return isRequired; + } + + public String getOptionValue(CommandLine cmd) { + return cmd.getOptionValue(this.name); + } + + @Override + public String toString() { + return getName(); + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDROptions.java ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDROptions.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDROptions.java new file mode 100644 index 0000000..026f6e3 --- /dev/null +++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDROptions.java @@ -0,0 +1,175 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.hive; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.GnuParser; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.commons.lang3.StringUtils; +import org.apache.falcon.hive.exception.HiveReplicationException; + +import java.io.File; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Tool Options. + */ +public class HiveDROptions { + private final Map<HiveDRArgs, String> context; + + protected HiveDROptions(Map<HiveDRArgs, String> context) { + this.context = context; + } + + public String getValue(HiveDRArgs arg) { + return context.get(arg); + } + + public Map<HiveDRArgs, String> getContext() { + return context; + } + + public String getSourceMetastoreUri() { + return context.get(HiveDRArgs.SOURCE_METASTORE_URI); + } + + public String getSourceMetastoreKerberosPrincipal() { + return context.get(HiveDRArgs.SOURCE_HIVE_METASTORE_KERBEROS_PRINCIPAL); + } + + public String getSourceHive2KerberosPrincipal() { + return context.get(HiveDRArgs.SOURCE_HIVE2_KERBEROS_PRINCIPAL); + } + + public List<String> getSourceDatabases() { + return Arrays.asList(context.get(HiveDRArgs.SOURCE_DATABASE).trim().split(",")); + } + + public List<String> getSourceTables() { + return Arrays.asList(context.get(HiveDRArgs.SOURCE_TABLE).trim().split(",")); + } + + public String getSourceStagingPath() throws HiveReplicationException { + if (StringUtils.isNotEmpty(context.get(HiveDRArgs.SOURCE_STAGING_PATH))) { + return context.get(HiveDRArgs.SOURCE_STAGING_PATH) + File.separator + getJobName(); + } + throw new HiveReplicationException("Source StagingPath cannot be empty"); + } + + public String getTargetWriteEP() { + return context.get(HiveDRArgs.TARGET_NN); + } + + public String getTargetMetastoreUri() { + return context.get(HiveDRArgs.TARGET_METASTORE_URI); + } + + public String getTargetNNKerberosPrincipal() { + return context.get(HiveDRArgs.TARGET_NN_KERBEROS_PRINCIPAL); + } + + public String getTargetMetastoreKerberosPrincipal() { + return context.get(HiveDRArgs.TARGET_HIVE_METASTORE_KERBEROS_PRINCIPAL); + } + public String getTargetHive2KerberosPrincipal() { + return context.get(HiveDRArgs.TARGET_HIVE2_KERBEROS_PRINCIPAL); + } + + public String getTargetStagingPath() throws HiveReplicationException { + if (StringUtils.isNotEmpty(context.get(HiveDRArgs.TARGET_STAGING_PATH))) { + return context.get(HiveDRArgs.TARGET_STAGING_PATH) + File.separator + getJobName(); + } + throw new HiveReplicationException("Target StagingPath cannot be empty"); + } + + public String getReplicationMaxMaps() { + return context.get(HiveDRArgs.REPLICATION_MAX_MAPS); + } + + public String getJobName() { + return context.get(HiveDRArgs.JOB_NAME); + } + + public int getMaxEvents() { + return Integer.valueOf(context.get(HiveDRArgs.MAX_EVENTS)); + } + + public boolean shouldKeepHistory() { + return Boolean.valueOf(context.get(HiveDRArgs.KEEP_HISTORY)); + } + + public String getJobClusterWriteEP() { + return context.get(HiveDRArgs.CLUSTER_FOR_JOB_RUN_WRITE_EP); + } + + public String getJobClusterNNPrincipal() { + return context.get(HiveDRArgs.CLUSTER_FOR_JOB_NN_KERBEROS_PRINCIPAL); + } + + public void setSourceStagingDir(String path) { + context.put(HiveDRArgs.SOURCE_STAGING_PATH, path); + } + + public void setTargetStagingDir(String path) { + context.put(HiveDRArgs.TARGET_STAGING_PATH, path); + } + + public String getExecutionStage() { + return context.get(HiveDRArgs.EXECUTION_STAGE); + } + + public boolean shouldBlock() { + return true; + } + + public static HiveDROptions create(String[] args) throws ParseException { + Map<HiveDRArgs, String> options = new HashMap<HiveDRArgs, String>(); + + CommandLine cmd = getCommand(args); + for (HiveDRArgs arg : HiveDRArgs.values()) { + String optionValue = arg.getOptionValue(cmd); + if (StringUtils.isNotEmpty(optionValue)) { + options.put(arg, optionValue); + } + } + + return new HiveDROptions(options); + } + + private static CommandLine getCommand(String[] arguments) throws ParseException { + Options options = new Options(); + + for (HiveDRArgs arg : HiveDRArgs.values()) { + addOption(options, arg, arg.isRequired()); + } + + return new GnuParser().parse(options, arguments, false); + } + + private static void addOption(Options options, HiveDRArgs arg, boolean isRequired) { + Option option = arg.getOption(); + option.setRequired(isRequired); + options.addOption(option); + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRTool.java ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRTool.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRTool.java new file mode 100644 index 0000000..712efe8 --- /dev/null +++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRTool.java @@ -0,0 +1,378 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.hive; + +import org.apache.commons.cli.ParseException; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.falcon.hive.exception.HiveReplicationException; +import org.apache.falcon.hive.mapreduce.CopyMapper; +import org.apache.falcon.hive.mapreduce.CopyReducer; +import org.apache.falcon.hive.util.DRStatusStore; +import org.apache.falcon.hive.util.DelimiterUtils; +import org.apache.falcon.hive.util.EventSourcerUtils; +import org.apache.falcon.hive.util.FileUtils; +import org.apache.falcon.hive.util.HiveDRStatusStore; +import org.apache.falcon.hive.util.HiveDRUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.util.HashMap; +import java.util.Map; + +/** + * DR Tool Driver. + */ +public class HiveDRTool extends Configured implements Tool { + + private static final String META_PATH_FILE_SUFFIX = ".metapath"; + + private FileSystem jobFS; + private FileSystem targetClusterFs; + + private HiveDROptions inputOptions; + private DRStatusStore drStore; + private String eventsMetaFile; + private EventSourcerUtils eventSoucerUtil; + private Configuration jobConf; + private String executionStage; + + public static final FsPermission STAGING_DIR_PERMISSION = + new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL); + + private static final Logger LOG = LoggerFactory.getLogger(HiveDRTool.class); + + public HiveDRTool() { + } + + @Override + public int run(String[] args) throws Exception { + if (args.length < 1) { + usage(); + return -1; + } + + try { + init(args); + } catch (Throwable e) { + LOG.error("Invalid arguments: ", e); + System.err.println("Invalid arguments: " + e.getMessage()); + usage(); + return -1; + } + + try { + execute(); + } catch (Exception e) { + System.err.println("Exception encountered " + e.getMessage()); + e.printStackTrace(); + LOG.error("Exception encountered, cleaning up staging dirs", e); + cleanup(); + return -1; + } + + if (inputOptions.getExecutionStage().equalsIgnoreCase(HiveDRUtils.ExecutionStage.IMPORT.name())) { + cleanup(); + } + + return 0; + } + + private void init(String[] args) throws Exception { + LOG.info("Initializing HiveDR"); + inputOptions = parseOptions(args); + LOG.info("Input Options: {}", inputOptions); + + Configuration targetConf = FileUtils.getConfiguration(inputOptions.getTargetWriteEP(), + inputOptions.getTargetNNKerberosPrincipal()); + targetClusterFs = FileSystem.get(targetConf); + jobConf = FileUtils.getConfiguration(inputOptions.getJobClusterWriteEP(), + inputOptions.getJobClusterNNPrincipal()); + jobFS = FileSystem.get(jobConf); + + // init DR status store + drStore = new HiveDRStatusStore(targetClusterFs); + eventSoucerUtil = new EventSourcerUtils(jobConf, inputOptions.shouldKeepHistory(), inputOptions.getJobName()); + } + + private HiveDROptions parseOptions(String[] args) throws ParseException { + return HiveDROptions.create(args); + } + + public Job execute() throws Exception { + assert inputOptions != null; + assert getConf() != null; + executionStage = inputOptions.getExecutionStage(); + LOG.info("Executing Workflow stage : {}", executionStage); + if (executionStage.equalsIgnoreCase(HiveDRUtils.ExecutionStage.LASTEVENTS.name())) { + String lastEventsIdFile = getLastEvents(jobConf); + LOG.info("Last successfully replicated Event file : {}", lastEventsIdFile); + return null; + } else if (executionStage.equalsIgnoreCase(HiveDRUtils.ExecutionStage.EXPORT.name())) { + createStagingDirectory(); + eventsMetaFile = sourceEvents(); + LOG.info("Sourced Events meta file : {}", eventsMetaFile); + if (StringUtils.isEmpty(eventsMetaFile)) { + LOG.info("No events to process"); + return null; + } else { + /* + * eventsMetaFile contains the events to be processed by HiveDr. This file should be available + * for the import action as well. Persist the file at a location common to both export and import. + */ + persistEventsMetafileLocation(eventsMetaFile); + } + } else if (executionStage.equalsIgnoreCase(HiveDRUtils.ExecutionStage.IMPORT.name())) { + // read the location of eventsMetaFile from hdfs + eventsMetaFile = getEventsMetaFileLocation(); + if (StringUtils.isEmpty(eventsMetaFile)) { + LOG.info("No events to process"); + return null; + } + } else { + throw new HiveReplicationException("Invalid Execution stage : " + inputOptions.getExecutionStage()); + } + + Job job = createJob(); + job.submit(); + + String jobID = job.getJobID().toString(); + job.getConfiguration().set("HIVEDR_JOB_ID", jobID); + + LOG.info("HiveDR job-id: {}", jobID); + if (inputOptions.shouldBlock() && !job.waitForCompletion(true)) { + throw new IOException("HiveDR failure: Job " + jobID + " has failed: " + + job.getStatus().getFailureInfo()); + } + + return job; + } + + private Job createJob() throws Exception { + String jobName = "hive-dr" + executionStage; + String userChosenName = getConf().get(JobContext.JOB_NAME); + if (userChosenName != null) { + jobName += ": " + userChosenName; + } + Job job = Job.getInstance(getConf()); + job.setJobName(jobName); + job.setJarByClass(CopyMapper.class); + job.setMapperClass(CopyMapper.class); + job.setReducerClass(CopyReducer.class); + job.setInputFormatClass(org.apache.hadoop.mapreduce.lib.input.NLineInputFormat.class); + + job.setOutputFormatClass(NullOutputFormat.class); + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(Text.class); + + job.getConfiguration().set(JobContext.MAP_SPECULATIVE, "false"); + job.getConfiguration().set(JobContext.NUM_MAPS, + String.valueOf(inputOptions.getReplicationMaxMaps())); + + for (HiveDRArgs args : HiveDRArgs.values()) { + if (inputOptions.getValue(args) != null) { + job.getConfiguration().set(args.getName(), inputOptions.getValue(args)); + } else { + job.getConfiguration().set(args.getName(), "null"); + } + } + job.getConfiguration().set(FileInputFormat.INPUT_DIR, eventsMetaFile); + + return job; + } + + private void createStagingDirectory() throws IOException, HiveReplicationException { + Path sourceStagingPath = new Path(inputOptions.getSourceStagingPath()); + Path targetStagingPath = new Path(inputOptions.getTargetStagingPath()); + LOG.info("Source staging path: {}", sourceStagingPath); + if (!FileSystem.mkdirs(jobFS, sourceStagingPath, STAGING_DIR_PERMISSION)) { + throw new IOException("mkdir failed for " + sourceStagingPath); + } + + LOG.info("Target staging path: {}", targetStagingPath); + if (!FileSystem.mkdirs(targetClusterFs, targetStagingPath, STAGING_DIR_PERMISSION)) { + throw new IOException("mkdir failed for " + targetStagingPath); + } + } + + private void cleanStagingDirectory() throws HiveReplicationException { + LOG.info("Cleaning staging directories"); + Path sourceStagingPath = new Path(inputOptions.getSourceStagingPath()); + Path targetStagingPath = new Path(inputOptions.getTargetStagingPath()); + try { + if (jobFS.exists(sourceStagingPath)) { + jobFS.delete(sourceStagingPath, true); + } + + if (targetClusterFs.exists(targetStagingPath)) { + targetClusterFs.delete(targetStagingPath, true); + } + } catch (IOException e) { + LOG.error("Unable to cleanup staging dir:", e); + } + } + + private String sourceEvents() throws Exception { + MetaStoreEventSourcer defaultSourcer = null; + String inputFilename = null; + String lastEventsIdFile = FileUtils.DEFAULT_EVENT_STORE_PATH +File.separator+inputOptions.getJobName()+"/" + +inputOptions.getJobName()+".id"; + Map<String, Long> lastEventsIdMap = getLastDBTableEvents(new Path(lastEventsIdFile)); + try { + defaultSourcer = new MetaStoreEventSourcer(inputOptions.getSourceMetastoreUri(), + inputOptions.getSourceMetastoreKerberosPrincipal(), inputOptions.getSourceHive2KerberosPrincipal(), + new DefaultPartitioner(drStore, eventSoucerUtil), eventSoucerUtil, lastEventsIdMap); + inputFilename = defaultSourcer.sourceEvents(inputOptions); + } finally { + if (defaultSourcer != null) { + defaultSourcer.cleanUp(); + } + } + + return inputFilename; + } + + private String getLastEvents(Configuration conf) throws Exception { + LastReplicatedEvents lastEvents = new LastReplicatedEvents(conf, + inputOptions.getTargetMetastoreUri(), + inputOptions.getTargetMetastoreKerberosPrincipal(), + inputOptions.getTargetHive2KerberosPrincipal(), + drStore, inputOptions); + String eventIdFile = lastEvents.getLastEvents(inputOptions); + lastEvents.cleanUp(); + return eventIdFile; + } + + private Map<String, Long> getLastDBTableEvents(Path lastEventIdFile) throws Exception { + Map<String, Long> lastEventsIdMap = new HashMap<String, Long>(); + BufferedReader in = new BufferedReader(new InputStreamReader(jobFS.open(lastEventIdFile))); + try { + String line; + while ((line=in.readLine())!=null) { + String[] field = line.trim().split(DelimiterUtils.TAB_DELIM, -1); + lastEventsIdMap.put(field[0], Long.parseLong(field[1])); + } + } catch (Exception e) { + throw new IOException(e); + } finally { + IOUtils.closeQuietly(in); + } + + return lastEventsIdMap; + } + + public static void main(String[] args) { + int exitCode; + try { + HiveDRTool hiveDRTool = new HiveDRTool(); + exitCode = ToolRunner.run(HiveDRUtils.getDefaultConf(), hiveDRTool, args); + } catch (Exception e) { + LOG.error("Couldn't complete HiveDR operation: ", e); + exitCode = -1; + } + + System.exit(exitCode); + } + + private void cleanInputDir() { + eventSoucerUtil.cleanUpEventInputDir(); + } + + private synchronized void cleanup() throws HiveReplicationException { + cleanStagingDirectory(); + cleanInputDir(); + cleanTempFiles(); + try { + if (jobFS != null) { + jobFS.close(); + } + if (targetClusterFs != null) { + targetClusterFs.close(); + } + } catch (IOException e) { + LOG.error("Closing FS failed", e); + } + } + + private void cleanTempFiles() { + Path eventsDirPath = new Path(FileUtils.DEFAULT_EVENT_STORE_PATH, inputOptions.getJobName()); + Path metaFilePath = new Path(eventsDirPath.toString(), inputOptions.getJobName() + META_PATH_FILE_SUFFIX); + Path eventsFilePath = new Path(eventsDirPath.toString(), inputOptions.getJobName() + ".id"); + + try { + if (jobFS.exists(metaFilePath)) { + jobFS.delete(metaFilePath, true); + } + if (jobFS.exists(eventsFilePath)) { + jobFS.delete(eventsFilePath, true); + } + } catch (IOException e) { + LOG.error("Deleting Temp files failed", e); + } + } + + public void persistEventsMetafileLocation(final String eventMetaFilePath) throws IOException { + Path eventsDirPath = new Path(FileUtils.DEFAULT_EVENT_STORE_PATH, inputOptions.getJobName()); + Path metaFilePath = new Path(eventsDirPath.toString(), inputOptions.getJobName() + META_PATH_FILE_SUFFIX); + + OutputStream out = null; + try { + out = FileSystem.create(jobFS, metaFilePath, FileUtils.FS_PERMISSION_700); + out.write(eventMetaFilePath.getBytes()); + out.flush(); + } finally { + IOUtils.closeQuietly(out); + } + } + + private String getEventsMetaFileLocation() throws IOException { + Path eventsDirPath = new Path(FileUtils.DEFAULT_EVENT_STORE_PATH, inputOptions.getJobName()); + Path metaFilePath = new Path(eventsDirPath.toString(), inputOptions.getJobName() + META_PATH_FILE_SUFFIX); + String line = null; + if (jobFS.exists(metaFilePath)) { + BufferedReader in = new BufferedReader(new InputStreamReader(jobFS.open(metaFilePath))); + line = in.readLine(); + in.close(); + } + return line; + } + + + public static void usage() { + System.out.println("Usage: hivedrtool -option value ...."); + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/addons/hivedr/src/main/java/org/apache/falcon/hive/LastReplicatedEvents.java ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/LastReplicatedEvents.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/LastReplicatedEvents.java new file mode 100644 index 0000000..94f936c --- /dev/null +++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/LastReplicatedEvents.java @@ -0,0 +1,189 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.hive; + +import org.apache.commons.io.IOUtils; +import org.apache.falcon.hive.exception.HiveReplicationException; +import org.apache.falcon.hive.util.DRStatusStore; +import org.apache.falcon.hive.util.DelimiterUtils; +import org.apache.falcon.hive.util.FileUtils; +import org.apache.falcon.hive.util.HiveDRUtils; +import org.apache.falcon.hive.util.HiveMetastoreUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hive.hcatalog.api.HCatClient; +import org.apache.hive.hcatalog.api.HCatTable; +import org.apache.hive.hcatalog.api.repl.ReplicationUtils; +import org.apache.hive.hcatalog.common.HCatException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Sources meta store change events from Hive. + */ +public class LastReplicatedEvents { + private static final Logger LOG = LoggerFactory.getLogger(LastReplicatedEvents.class); + private final HCatClient targetMetastoreClient; + private final DRStatusStore drStore; + private final FileSystem jobFS; + private Path eventsInputDirPath; + + /* TODO handle cases when no events. files will be empty and lists will be empty */ + public LastReplicatedEvents(Configuration conf, String targetMetastoreUri, + String targetMetastoreKerberosPrincipal, + String targetHive2KerberosPrincipal, + DRStatusStore drStore, HiveDROptions inputOptions) throws Exception { + targetMetastoreClient = HiveMetastoreUtils.initializeHiveMetaStoreClient(targetMetastoreUri, + targetMetastoreKerberosPrincipal, targetHive2KerberosPrincipal); + jobFS = FileSystem.get(conf); + this.drStore = drStore; + init(inputOptions.getJobName()); + } + + private void init(final String jobName) throws Exception { + // Create base dir to store events on cluster where job is running + Path dir = new Path(FileUtils.DEFAULT_EVENT_STORE_PATH); + // Validate base path + FileUtils.validatePath(jobFS, new Path(DRStatusStore.BASE_DEFAULT_STORE_PATH)); + + if (!jobFS.exists(dir)) { + if (!jobFS.mkdirs(dir)) { + throw new Exception("Creating directory failed: " + dir); + } + } + + eventsInputDirPath = new Path(FileUtils.DEFAULT_EVENT_STORE_PATH, jobName); + + if (!jobFS.exists(eventsInputDirPath)) { + if (!jobFS.mkdirs(eventsInputDirPath)) { + throw new Exception("Creating directory failed: " + eventsInputDirPath); + } + } + } + + public String getLastEvents(HiveDROptions inputOptions) throws Exception { + HiveDRUtils.ReplicationType replicationType = HiveDRUtils.getReplicationType(inputOptions.getSourceTables()); + LOG.info("Obtaining last events for replicationType : {}", replicationType); + HashMap<String, Long> lastEvents = new HashMap<String, Long>(); + if (replicationType == HiveDRUtils.ReplicationType.DB) { + List<String> dbNames = inputOptions.getSourceDatabases(); + for (String db : dbNames) { + lastEvents.put(db, getLastSavedEventId(inputOptions, db, null)); + } + } else { + List<String> tableNames = inputOptions.getSourceTables(); + String db = inputOptions.getSourceDatabases().get(0); + for (String tableName : tableNames) { + lastEvents.put(db + "." + tableName, getLastSavedEventId(inputOptions, db, tableName)); + } + } + + return persistLastEventsToFile(lastEvents, inputOptions.getJobName()); + } + + private long getLastSavedEventId(HiveDROptions inputOptions, final String dbName, + final String tableName) throws Exception { + HiveDRUtils.ReplicationType replicationType = HiveDRUtils.getReplicationType(inputOptions.getSourceTables()); + String jobName = inputOptions.getJobName(); + String sourceMetastoreUri = inputOptions.getSourceMetastoreUri(); + String targetMetastoreUri = inputOptions.getTargetMetastoreUri(); + + long eventId = 0; + if (HiveDRUtils.ReplicationType.DB == replicationType) { + eventId = drStore.getReplicationStatus(sourceMetastoreUri, targetMetastoreUri, + jobName, dbName).getEventId(); + } else if (HiveDRUtils.ReplicationType.TABLE == replicationType) { + eventId = drStore.getReplicationStatus(sourceMetastoreUri, targetMetastoreUri, + jobName, dbName, tableName).getEventId(); + } + + if (eventId == -1) { + if (HiveDRUtils.ReplicationType.DB == replicationType) { + /* + * API to get last repl ID for a DB is very expensive, so Hive does not want to make it public. + * HiveDrTool finds last repl id for DB by finding min last repl id of all tables. + */ + eventId = getLastReplicationIdForDatabase(dbName); + } else { + HCatTable table = targetMetastoreClient.getTable(dbName, tableName); + eventId = ReplicationUtils.getLastReplicationId(table); + } + } + LOG.info("Last saved eventId : {}", eventId); + return eventId; + } + + private long getLastReplicationIdForDatabase(String databaseName) throws HiveReplicationException { + /* + * This is a very expensive method and should only be called during first dbReplication instance. + */ + long eventId = Long.MAX_VALUE; + try { + List<String> tableList = targetMetastoreClient.listTableNamesByPattern(databaseName, "*"); + for (String tableName : tableList) { + long temp = ReplicationUtils.getLastReplicationId( + targetMetastoreClient.getTable(databaseName, tableName)); + if (temp < eventId) { + eventId = temp; + } + } + return (eventId == Long.MAX_VALUE) ? 0 : eventId; + } catch (HCatException e) { + throw new HiveReplicationException("Unable to find last replication id for database " + + databaseName, e); + } + } + + public String persistLastEventsToFile(final HashMap<String, Long> lastEvents, + final String identifier) throws IOException { + if (lastEvents.size()!=0) { + Path eventsFile = new Path(eventsInputDirPath.toString(), identifier+".id"); + OutputStream out = null; + + try { + out = FileSystem.create(jobFS, eventsFile, FileUtils.FS_PERMISSION_700); + for (Map.Entry<String, Long> entry : lastEvents.entrySet()) { + out.write(entry.getKey().getBytes()); + out.write(DelimiterUtils.TAB_DELIM.getBytes()); + out.write(String.valueOf(entry.getValue()).getBytes()); + out.write(DelimiterUtils.NEWLINE_DELIM.getBytes()); + } + out.flush(); + } finally { + IOUtils.closeQuietly(out); + } + return jobFS.makeQualified(eventsFile).toString(); + } else { + return null; + } + } + + public void cleanUp() throws Exception { + if (targetMetastoreClient != null) { + targetMetastoreClient.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/addons/hivedr/src/main/java/org/apache/falcon/hive/MetaStoreEventSourcer.java ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/MetaStoreEventSourcer.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/MetaStoreEventSourcer.java new file mode 100644 index 0000000..6f3fe8f --- /dev/null +++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/MetaStoreEventSourcer.java @@ -0,0 +1,202 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.hive; + +import org.apache.commons.lang3.StringUtils; +import org.apache.falcon.hive.util.EventSourcerUtils; +import org.apache.falcon.hive.util.HiveDRUtils; +import org.apache.falcon.hive.util.HiveMetastoreUtils; +import org.apache.hive.hcatalog.api.HCatClient; +import org.apache.hive.hcatalog.api.repl.ReplicationTask; +import org.apache.hive.hcatalog.api.repl.StagingDirectoryProvider; +import org.apache.hive.hcatalog.common.HCatException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.OutputStream; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +/** + * Sources meta store change events from Hive. + */ +public class MetaStoreEventSourcer implements EventSourcer { + + private static final Logger LOG = LoggerFactory.getLogger(MetaStoreEventSourcer.class); + + private final HCatClient sourceMetastoreClient; + private final Partitioner partitioner; + private final EventSourcerUtils eventSourcerUtils; + private final ReplicationEventMetadata eventMetadata; + private Map<String, Long> lastEventsIdMap = null; + private long lastCounter; + + /* TODO handle cases when no events. files will be empty and lists will be empty */ + public MetaStoreEventSourcer(String sourceMetastoreUri, String sourceMetastoreKerberosPrincipal, + String sourceHive2KerberosPrincipal, Partitioner partitioner, + EventSourcerUtils eventSourcerUtils, Map<String, Long> lastEventsIdMap) + throws Exception { + + sourceMetastoreClient = HiveMetastoreUtils.initializeHiveMetaStoreClient(sourceMetastoreUri, + sourceMetastoreKerberosPrincipal, sourceHive2KerberosPrincipal); + eventMetadata = new ReplicationEventMetadata(); + this.partitioner = partitioner; + this.eventSourcerUtils = eventSourcerUtils; + this.lastEventsIdMap = lastEventsIdMap; + } + + public String sourceEvents(HiveDROptions inputOptions) throws Exception { + HiveDRUtils.ReplicationType replicationType = HiveDRUtils.getReplicationType(inputOptions.getSourceTables()); + LOG.info("Sourcing replication events for type : {}", replicationType); + if (replicationType == HiveDRUtils.ReplicationType.DB) { + List<String> dbNames = inputOptions.getSourceDatabases(); + for (String db : dbNames) { + ++lastCounter; + sourceEventsForDb(inputOptions, db); + } + } else { + List<String> tableNames = inputOptions.getSourceTables(); + String db = inputOptions.getSourceDatabases().get(0); + for (String tableName : tableNames) { + ++lastCounter; + sourceEventsForTable(inputOptions, db, tableName); + } + } + + if (eventMetadata.getEventFileMetadata() == null || eventMetadata.getEventFileMetadata().isEmpty()) { + LOG.info("No events for tables for the request db: {} , Tables : {}", inputOptions.getSourceDatabases(), + inputOptions.getSourceTables()); + eventSourcerUtils.cleanUpEventInputDir(); + return null; + } else { + return eventSourcerUtils.persistToMetaFile(eventMetadata, inputOptions.getJobName()); + } + } + + private void sourceEventsForDb(HiveDROptions inputOptions, String dbName) throws Exception { + Iterator<ReplicationTask> replicationTaskIter = sourceReplicationEvents(getLastSavedEventId(dbName, null), + inputOptions.getMaxEvents(), dbName, null); + if (replicationTaskIter == null || !replicationTaskIter.hasNext()) { + LOG.info("No events for db: {}", dbName); + } + processEvents(dbName, null, inputOptions, replicationTaskIter); + } + + private void sourceEventsForTable(HiveDROptions inputOptions, String dbName, String tableName) + throws Exception { + Iterator<ReplicationTask> replicationTaskIter = sourceReplicationEvents(getLastSavedEventId(dbName, tableName), + inputOptions.getMaxEvents(), dbName, tableName + ); + if (replicationTaskIter == null || !replicationTaskIter.hasNext()) { + LOG.info("No events for db.table: {}.{}", dbName, tableName); + } + processEvents(dbName, tableName, inputOptions, replicationTaskIter); + } + + private void processEvents(String dbName, String tableName, HiveDROptions inputOptions, + Iterator<ReplicationTask> replicationTaskIter) throws Exception { + if (partitioner.isPartitioningRequired(inputOptions)) { + ReplicationEventMetadata dbEventMetadata = partitioner.partition(inputOptions, dbName, replicationTaskIter); + + if (dbEventMetadata == null || dbEventMetadata.getEventFileMetadata() == null + || dbEventMetadata.getEventFileMetadata().isEmpty()) { + LOG.info("No events for db: {} , Table : {}", dbName, tableName); + } else { + EventSourcerUtils.updateEventMetadata(eventMetadata, dbEventMetadata); + } + } else { + processTableReplicationEvents(replicationTaskIter, dbName, tableName, + inputOptions.getSourceStagingPath(), inputOptions.getTargetStagingPath()); + } + } + + private long getLastSavedEventId(final String dbName, final String tableName) throws Exception { + String key = dbName; + if (StringUtils.isNotEmpty(tableName)) { + key = dbName + "." + tableName; + } + long eventId = lastEventsIdMap.get(key); + LOG.info("LastSavedEventId eventId for {} : {}", key, eventId); + return eventId; + } + + private Iterator<ReplicationTask> sourceReplicationEvents(long lastEventId, int maxEvents, String dbName, + String tableName) throws Exception { + try { + return sourceMetastoreClient.getReplicationTasks(lastEventId, maxEvents, dbName, tableName); + } catch (HCatException e) { + throw new Exception("Exception getting replication events " + e.getMessage(), e); + } + } + + + private void processTableReplicationEvents(Iterator<ReplicationTask> taskIter, String dbName, + String tableName, String srcStagingDirProvider, + String dstStagingDirProvider) throws Exception { + String srcFilename = null; + String tgtFilename = null; + OutputStream srcOutputStream = null; + OutputStream tgtOutputStream = null; + + while (taskIter.hasNext()) { + ReplicationTask task = taskIter.next(); + if (task.needsStagingDirs()) { + task.withSrcStagingDirProvider(new StagingDirectoryProvider.TrivialImpl(srcStagingDirProvider, + HiveDRUtils.SEPARATOR)); + task.withDstStagingDirProvider(new StagingDirectoryProvider.TrivialImpl(dstStagingDirProvider, + HiveDRUtils.SEPARATOR)); + } + + if (task.isActionable()) { + Iterable<? extends org.apache.hive.hcatalog.api.repl.Command> srcCmds = task.getSrcWhCommands(); + if (srcCmds != null) { + if (StringUtils.isEmpty(srcFilename)) { + srcFilename = eventSourcerUtils.getSrcFileName(String.valueOf(lastCounter)).toString(); + srcOutputStream = eventSourcerUtils.getFileOutputStream(srcFilename); + } + eventSourcerUtils.persistReplicationEvents(srcOutputStream, srcCmds); + } + + + Iterable<? extends org.apache.hive.hcatalog.api.repl.Command> dstCmds = task.getDstWhCommands(); + if (dstCmds != null) { + if (StringUtils.isEmpty(tgtFilename)) { + tgtFilename = eventSourcerUtils.getTargetFileName(String.valueOf(lastCounter)).toString(); + tgtOutputStream = eventSourcerUtils.getFileOutputStream(tgtFilename); + } + eventSourcerUtils.persistReplicationEvents(tgtOutputStream, dstCmds); + } + + } else { + LOG.error("Task is not actionable with event Id : {}", task.getEvent().getEventId()); + } + } + // Close the stream + eventSourcerUtils.closeOutputStream(srcOutputStream); + eventSourcerUtils.closeOutputStream(tgtOutputStream); + EventSourcerUtils.updateEventMetadata(eventMetadata, dbName, tableName, srcFilename, tgtFilename); + } + + public void cleanUp() throws Exception { + if (sourceMetastoreClient != null) { + sourceMetastoreClient.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/addons/hivedr/src/main/java/org/apache/falcon/hive/Partitioner.java ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/Partitioner.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/Partitioner.java new file mode 100644 index 0000000..25b8bd6 --- /dev/null +++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/Partitioner.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.hive; + +import org.apache.hive.hcatalog.api.repl.ReplicationTask; + +import java.util.Iterator; + +/** + * Partition hive events. + */ +public interface Partitioner { + /** + * Partition events. + * + * @param options Hive dr options. + * @param dbName Database name. + * @param replicationTaskIterator Repl task iterator. + * @return ReplicationEventMetadata + */ + ReplicationEventMetadata partition(final HiveDROptions options, + final String dbName, + final Iterator<ReplicationTask> replicationTaskIterator) throws Exception; + + boolean isPartitioningRequired(final HiveDROptions options); +} http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/addons/hivedr/src/main/java/org/apache/falcon/hive/ReplicationEventMetadata.java ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/ReplicationEventMetadata.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/ReplicationEventMetadata.java new file mode 100644 index 0000000..79e0ded --- /dev/null +++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/ReplicationEventMetadata.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.hive; + +import java.util.Map; +import java.util.HashMap; + +/** + * Replication event meta data class. + */ +public class ReplicationEventMetadata { + + private Map<String, String> eventFileMetadata = new HashMap<>(); + + public Map<String, String> getEventFileMetadata() { + return eventFileMetadata; + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/cc1d3840/addons/hivedr/src/main/java/org/apache/falcon/hive/exception/HiveReplicationException.java ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/exception/HiveReplicationException.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/exception/HiveReplicationException.java new file mode 100644 index 0000000..0baf6d8 --- /dev/null +++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/exception/HiveReplicationException.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.hive.exception; + +/** + * Wrapper class for HiveReplication exceptions. + */ +public class HiveReplicationException extends Exception { + + /** + * @param e Exception + */ + public HiveReplicationException(Throwable e) { + super(e); + } + + public HiveReplicationException(String message, Throwable e) { + super(message, e); + } + + /** + * @param message - custom exception message + */ + public HiveReplicationException(String message) { + super(message); + } + + /** + * + */ + private static final long serialVersionUID = -1475818869309247014L; + +}
