http://git-wip-us.apache.org/repos/asf/falcon/blob/6f5b476c/addons/hivedr/src/main/java/org/apache/falcon/hive/DefaultPartitioner.java ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/DefaultPartitioner.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/DefaultPartitioner.java deleted file mode 100644 index ce4bfab..0000000 --- a/addons/hivedr/src/main/java/org/apache/falcon/hive/DefaultPartitioner.java +++ /dev/null @@ -1,317 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.hive; - -import com.google.common.collect.Lists; -import org.apache.commons.lang3.StringUtils; -import org.apache.falcon.hive.util.DRStatusStore; -import org.apache.falcon.hive.util.EventSourcerUtils; -import org.apache.falcon.hive.util.HiveDRUtils; -import org.apache.falcon.hive.util.ReplicationStatus; -import org.apache.hive.hcatalog.api.repl.Command; -import org.apache.hive.hcatalog.api.repl.ReplicationTask; -import org.apache.hive.hcatalog.api.repl.StagingDirectoryProvider; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.OutputStream; -import java.util.Arrays; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; - -import static org.apache.hive.hcatalog.api.HCatNotificationEvent.Scope; - -/** - * Partitioner for partitioning events for a given DB. - */ -public class DefaultPartitioner implements Partitioner { - - private static final Logger LOG = LoggerFactory.getLogger(DefaultPartitioner.class); - private EventFilter eventFilter; - private final DRStatusStore drStore; - private final EventSourcerUtils eventSourcerUtils; - - private enum CMDTYPE { - SRC_CMD_TYPE, - TGT_CMD_TYPE - } - - public DefaultPartitioner(DRStatusStore drStore, EventSourcerUtils eventSourcerUtils) { - this.drStore = drStore; - this.eventSourcerUtils = eventSourcerUtils; - } - - private class EventFilter { - private final Map<String, Long> eventFilterMap; - - public EventFilter(String sourceMetastoreUri, String targetMetastoreUri, String jobName, - String database) throws Exception { - eventFilterMap = new HashMap<>(); - Iterator<ReplicationStatus> replStatusIter = drStore.getTableReplicationStatusesInDb(sourceMetastoreUri, - targetMetastoreUri, jobName, database); - while (replStatusIter.hasNext()) { - ReplicationStatus replStatus = replStatusIter.next(); - eventFilterMap.put(replStatus.getTable(), replStatus.getEventId()); - } - } - } - - public ReplicationEventMetadata partition(final HiveDROptions drOptions, final String databaseName, - final Iterator<ReplicationTask> taskIter) throws Exception { - long lastCounter = 0; - String dbName = databaseName.toLowerCase(); - // init filtering before partitioning - this.eventFilter = new EventFilter(drOptions.getSourceMetastoreUri(), drOptions.getTargetMetastoreUri(), - drOptions.getJobName(), dbName); - String srcStagingDirProvider = drOptions.getSourceStagingPath(); - String dstStagingDirProvider = drOptions.getTargetStagingPath(); - - List<Command> dbSrcEventList = Lists.newArrayList(); - List<Command> dbTgtEventList = Lists.newArrayList(); - - Map<String, List<String>> eventMetaFileMap = - new HashMap<>(); - Map<String, List<OutputStream>> outputStreamMap = - new HashMap<>(); - - - String srcFilename = null; - String tgtFilename = null; - OutputStream srcOutputStream = null; - OutputStream tgtOutputStream = null; - - while (taskIter.hasNext()) { - ReplicationTask task = taskIter.next(); - if (task.needsStagingDirs()) { - task.withSrcStagingDirProvider(new StagingDirectoryProvider.TrivialImpl(srcStagingDirProvider, - HiveDRUtils.SEPARATOR)); - task.withDstStagingDirProvider(new StagingDirectoryProvider.TrivialImpl(dstStagingDirProvider, - HiveDRUtils.SEPARATOR)); - } - - if (task.isActionable()) { - Scope eventScope = task.getEvent().getEventScope(); - String tableName = task.getEvent().getTableName(); - if (StringUtils.isNotEmpty(tableName)) { - tableName = tableName.toLowerCase(); - } - - boolean firstEventForTable = (eventScope == Scope.TABLE) - && isFirstEventForTable(eventMetaFileMap, tableName); - if (firstEventForTable && (task.getSrcWhCommands() != null || task.getDstWhCommands() != null)) { - ++lastCounter; - } - Iterable<? extends org.apache.hive.hcatalog.api.repl.Command> srcCmds = task.getSrcWhCommands(); - if (srcCmds != null) { - if (eventScope == Scope.DB) { - processDBScopeCommands(dbSrcEventList, srcCmds, outputStreamMap, CMDTYPE.SRC_CMD_TYPE); - } else if (eventScope == Scope.TABLE) { - OutputStream srcOut; - if (firstEventForTable) { - srcFilename = eventSourcerUtils.getSrcFileName(String.valueOf(lastCounter)).toString(); - srcOutputStream = eventSourcerUtils.getFileOutputStream(srcFilename); - srcOut = srcOutputStream; - } else { - srcOut = outputStreamMap.get(tableName).get(0); - } - processTableScopeCommands(srcCmds, eventMetaFileMap, tableName, dbSrcEventList, srcOut); - } else { - throw new Exception("Event scope is not DB or Table"); - } - } - - - Iterable<? extends org.apache.hive.hcatalog.api.repl.Command> dstCmds = task.getDstWhCommands(); - if (dstCmds != null) { - if (eventScope == Scope.DB) { - processDBScopeCommands(dbTgtEventList, dstCmds, outputStreamMap, CMDTYPE.TGT_CMD_TYPE); - } else if (eventScope == Scope.TABLE) { - OutputStream tgtOut; - if (firstEventForTable) { - tgtFilename = eventSourcerUtils.getTargetFileName(String.valueOf(lastCounter)).toString(); - tgtOutputStream = eventSourcerUtils.getFileOutputStream(tgtFilename); - tgtOut = tgtOutputStream; - } else { - tgtOut = outputStreamMap.get(tableName).get(1); - } - processTableScopeCommands(dstCmds, eventMetaFileMap, tableName, dbTgtEventList, tgtOut); - } else { - throw new Exception("Event scope is not DB or Table"); - } - } - - // If first table event, update the state data at the end - if (firstEventForTable) { - updateStateDataIfFirstTableEvent(tableName, srcFilename, tgtFilename, srcOutputStream, - tgtOutputStream, eventMetaFileMap, outputStreamMap); - } - } else { - LOG.error("Task is not actionable with event Id : {}", task.getEvent().getEventId()); - } - } - - ReplicationEventMetadata eventMetadata = new ReplicationEventMetadata(); - // If there were only DB events for this run - if (eventMetaFileMap.isEmpty()) { - ++lastCounter; - if (!dbSrcEventList.isEmpty()) { - srcFilename = eventSourcerUtils.getSrcFileName(String.valueOf(lastCounter)).toString(); - srcOutputStream = eventSourcerUtils.getFileOutputStream(srcFilename); - eventSourcerUtils.persistReplicationEvents(srcOutputStream, dbSrcEventList); - } - - if (!dbTgtEventList.isEmpty()) { - tgtFilename = eventSourcerUtils.getTargetFileName(String.valueOf(lastCounter)).toString(); - tgtOutputStream = eventSourcerUtils.getFileOutputStream(tgtFilename); - eventSourcerUtils.persistReplicationEvents(tgtOutputStream, dbTgtEventList); - } - - // Close the stream - eventSourcerUtils.closeOutputStream(srcOutputStream); - eventSourcerUtils.closeOutputStream(tgtOutputStream); - EventSourcerUtils.updateEventMetadata(eventMetadata, dbName, null, srcFilename, tgtFilename); - } else { - closeAllStreams(outputStreamMap); - for (Map.Entry<String, List<String>> entry : eventMetaFileMap.entrySet()) { - String srcFile = null; - String tgtFile = null; - if (entry.getValue() != null) { - srcFile = entry.getValue().get(0); - tgtFile = entry.getValue().get(1); - } - EventSourcerUtils.updateEventMetadata(eventMetadata, dbName, entry.getKey(), srcFile, tgtFile); - } - } - - return eventMetadata; - } - - private void updateStateDataIfFirstTableEvent(final String tableName, final String srcFilename, - final String tgtFilename, - final OutputStream srcOutputStream, - final OutputStream tgtOutputStream, - Map<String, List<String>> eventMetaFileMap, - Map<String, List<OutputStream>> outputStreamMap) { - List<String> files = Arrays.asList(srcFilename, tgtFilename); - eventMetaFileMap.put(tableName, files); - - List<OutputStream> streams = Arrays.asList(srcOutputStream, tgtOutputStream); - outputStreamMap.put(tableName, streams); - } - - private void closeAllStreams(final Map<String, List<OutputStream>> outputStreamMap) throws Exception { - if (outputStreamMap == null || outputStreamMap.isEmpty()) { - return; - } - - for (Map.Entry<String, List<OutputStream>> entry : outputStreamMap.entrySet()) { - List<OutputStream> streams = entry.getValue(); - - for (OutputStream out : streams) { - if (out != null) { - eventSourcerUtils.closeOutputStream(out); - } - } - } - } - - private void processDBScopeCommands(final List<Command> dbEventList, final Iterable<? extends org.apache.hive - .hcatalog.api.repl.Command> cmds, final Map<String, List<OutputStream>> outputStreamMap, CMDTYPE cmdType - ) throws Exception { - addCmdsToDBEventList(dbEventList, cmds); - - /* add DB event to all tables */ - if (!outputStreamMap.isEmpty()) { - addDbEventToAllTablesEventFile(cmds, outputStreamMap, cmdType); - } - } - - private void processTableScopeCommands(final Iterable<? extends org.apache.hive.hcatalog.api.repl.Command> cmds, - final Map<String, - List<String>> eventMetaFileMap, String tableName, - final List<Command> dbEventList, final OutputStream out) throws Exception { - // First event for this table - // Before adding this event, add all the DB events - if (isFirstEventForTable(eventMetaFileMap, tableName)) { - addDbEventsToTableEventFile(out, dbEventList, tableName); - } - addTableEventToFile(out, cmds, tableName); - } - - private boolean isFirstEventForTable(final Map<String, - List<String>> eventMetaFileMap, final String tableName) { - List<String> files = eventMetaFileMap.get(tableName); - return (files == null || files.isEmpty()); - } - - private void addCmdsToDBEventList(List<Command> dbEventList, final java.lang.Iterable - <? extends org.apache.hive.hcatalog.api.repl.Command> cmds) { - for (Command cmd : cmds) { - dbEventList.add(cmd); - } - } - - private void addDbEventToAllTablesEventFile( - final java.lang.Iterable<? extends org.apache.hive.hcatalog.api.repl.Command> cmds, - final Map<String, List<OutputStream>> outputStreamMap, final CMDTYPE cmdType) throws Exception { - for (Map.Entry<String, List<OutputStream>> entry : outputStreamMap.entrySet()) { - String tableName = entry.getKey(); - List<OutputStream> streams = entry.getValue(); - OutputStream out; - if (CMDTYPE.SRC_CMD_TYPE == cmdType) { - out = streams.get(0); - } else { - out = streams.get(1); - } - addTableEventToFile(out, cmds, tableName); - } - } - - private void addDbEventsToTableEventFile(final OutputStream out, final List<Command> dbEventList, - final String tableName) throws Exception { - /* First event for the table, add db events before adding this event */ - addTableEventToFile(out, dbEventList, tableName); - } - - private void addTableEventToFile(final OutputStream out, - final java.lang.Iterable<? extends org.apache.hive.hcatalog.api.repl.Command> cmds, - final String tableName) throws Exception { - Long eventId = eventFilter.eventFilterMap.get(tableName); - /* If not already processed, add it */ - for (Command cmd : cmds) { - persistEvent(out, eventId, cmd); - } - } - - private void persistEvent(final OutputStream out, final Long eventId, final Command cmd) throws Exception { - if (out == null) { - LOG.debug("persistEvent : out is null"); - return; - } - if (eventId == null || cmd.getEventId() > eventId) { - eventSourcerUtils.persistReplicationEvents(out, cmd); - } - } - - public boolean isPartitioningRequired(final HiveDROptions options) { - return (HiveDRUtils.getReplicationType(options.getSourceTables()) == HiveDRUtils.ReplicationType.DB); - } -}
http://git-wip-us.apache.org/repos/asf/falcon/blob/6f5b476c/addons/hivedr/src/main/java/org/apache/falcon/hive/EventSourcer.java ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/EventSourcer.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/EventSourcer.java deleted file mode 100644 index 5f3312c..0000000 --- a/addons/hivedr/src/main/java/org/apache/falcon/hive/EventSourcer.java +++ /dev/null @@ -1,31 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.hive; - -/** - * Source events for each table into a file. - */ -public interface EventSourcer { - /** - * @param inputOptions - * @return input filename to mapper - */ - /* Source events for each <db, table> into a file */ - String sourceEvents(HiveDROptions inputOptions) throws Exception; -} http://git-wip-us.apache.org/repos/asf/falcon/blob/6f5b476c/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRArgs.java ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRArgs.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRArgs.java deleted file mode 100644 index 5490232..0000000 --- a/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRArgs.java +++ /dev/null @@ -1,122 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.hive; - -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.Option; - -/** - * Arguments for workflow execution. - */ -public enum HiveDRArgs { - - // source meta store details - SOURCE_CLUSTER("sourceCluster", "source cluster"), - SOURCE_METASTORE_URI("sourceMetastoreUri", "source meta store uri"), - SOURCE_HS2_URI("sourceHiveServer2Uri", "source HS2 uri"), - SOURCE_DATABASE("sourceDatabase", "comma source databases"), - SOURCE_TABLE("sourceTable", "comma source tables"), - SOURCE_STAGING_PATH("sourceStagingPath", "source staging path for data"), - - // source hadoop endpoints - SOURCE_NN("sourceNN", "source name node"), - // source security kerberos principals - SOURCE_NN_KERBEROS_PRINCIPAL("sourceNNKerberosPrincipal", "Source name node kerberos principal", false), - SOURCE_HIVE_METASTORE_KERBEROS_PRINCIPAL("sourceHiveMetastoreKerberosPrincipal", - "Source hive metastore kerberos principal", false), - SOURCE_HIVE2_KERBEROS_PRINCIPAL("sourceHive2KerberosPrincipal", "Source hiveserver2 kerberos principal", false), - - TARGET_CLUSTER("targetCluster", "target cluster"), - // target meta store details - TARGET_METASTORE_URI("targetMetastoreUri", "source meta store uri"), - TARGET_HS2_URI("targetHiveServer2Uri", "source meta store uri"), - - TARGET_STAGING_PATH("targetStagingPath", "source staging path for data"), - - // target hadoop endpoints - TARGET_NN("targetNN", "target name node"), - // target security kerberos principals - TARGET_NN_KERBEROS_PRINCIPAL("targetNNKerberosPrincipal", "Target name node kerberos principal", false), - TARGET_HIVE_METASTORE_KERBEROS_PRINCIPAL("targetHiveMetastoreKerberosPrincipal", - "Target hive metastore kerberos principal", false), - TARGET_HIVE2_KERBEROS_PRINCIPAL("targetHive2KerberosPrincipal", "Target hiveserver2 kerberos principal", false), - - // num events - MAX_EVENTS("maxEvents", "number of events to process in this run"), - - // tuning params - REPLICATION_MAX_MAPS("replicationMaxMaps", "number of maps", false), - DISTCP_MAX_MAPS("distcpMaxMaps", "number of maps", false), - - // Map Bandwidth - DISTCP_MAP_BANDWIDTH("distcpMapBandwidth", "map bandwidth in mb", false), - - JOB_NAME("drJobName", "unique job name"), - - CLUSTER_FOR_JOB_RUN("clusterForJobRun", "cluster where job runs"), - JOB_CLUSTER_NN("clusterForJobRunWriteEP", "write end point of cluster where job runs"), - JOB_CLUSTER_NN_KERBEROS_PRINCIPAL("clusterForJobNNKerberosPrincipal", - "Namenode kerberos principal of cluster on which replication job runs", false), - - - FALCON_LIBPATH("falconLibPath", "Falcon Lib Path for Jar files", false), - - KEEP_HISTORY("keepHistory", "Keep history of events file generated", false), - EXECUTION_STAGE("executionStage", "Flag for workflow stage execution", false), - COUNTER_LOGDIR("counterLogDir", "Log directory to store counter file", false); - - private final String name; - private final String description; - private final boolean isRequired; - - HiveDRArgs(String name, String description) { - this(name, description, true); - } - - HiveDRArgs(String name, String description, boolean isRequired) { - this.name = name; - this.description = description; - this.isRequired = isRequired; - } - - public Option getOption() { - return new Option(this.name, true, this.description); - } - - public String getName() { - return this.name; - } - - public String getDescription() { - return description; - } - - public boolean isRequired() { - return isRequired; - } - - public String getOptionValue(CommandLine cmd) { - return cmd.getOptionValue(this.name); - } - - @Override - public String toString() { - return getName(); - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/6f5b476c/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDROptions.java ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDROptions.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDROptions.java deleted file mode 100644 index 28515e4..0000000 --- a/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDROptions.java +++ /dev/null @@ -1,183 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.hive; - -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.GnuParser; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.Options; -import org.apache.commons.cli.ParseException; -import org.apache.commons.lang3.StringUtils; -import org.apache.falcon.hive.exception.HiveReplicationException; - -import java.io.File; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** - * Tool Options. - */ -public class HiveDROptions { - private final Map<HiveDRArgs, String> context; - - protected HiveDROptions(Map<HiveDRArgs, String> context) { - this.context = context; - } - - public String getValue(HiveDRArgs arg) { - return context.get(arg); - } - - public Map<HiveDRArgs, String> getContext() { - return context; - } - - public String getSourceMetastoreUri() { - return context.get(HiveDRArgs.SOURCE_METASTORE_URI); - } - - public String getSourceMetastoreKerberosPrincipal() { - return context.get(HiveDRArgs.SOURCE_HIVE_METASTORE_KERBEROS_PRINCIPAL); - } - - public String getSourceHive2KerberosPrincipal() { - return context.get(HiveDRArgs.SOURCE_HIVE2_KERBEROS_PRINCIPAL); - } - - public List<String> getSourceDatabases() { - return Arrays.asList(context.get(HiveDRArgs.SOURCE_DATABASE).trim().split(",")); - } - - public List<String> getSourceTables() { - return Arrays.asList(context.get(HiveDRArgs.SOURCE_TABLE).trim().split(",")); - } - - public String getSourceStagingPath() throws HiveReplicationException { - if (StringUtils.isNotEmpty(context.get(HiveDRArgs.SOURCE_STAGING_PATH))) { - return context.get(HiveDRArgs.SOURCE_STAGING_PATH) + File.separator + getJobName(); - } - throw new HiveReplicationException("Source StagingPath cannot be empty"); - } - - public String getSourceWriteEP() { - return context.get(HiveDRArgs.SOURCE_NN); - } - - public String getSourceNNKerberosPrincipal() { - return context.get(HiveDRArgs.SOURCE_NN_KERBEROS_PRINCIPAL); - } - - public String getTargetWriteEP() { - return context.get(HiveDRArgs.TARGET_NN); - } - - public String getTargetMetastoreUri() { - return context.get(HiveDRArgs.TARGET_METASTORE_URI); - } - - public String getTargetNNKerberosPrincipal() { - return context.get(HiveDRArgs.TARGET_NN_KERBEROS_PRINCIPAL); - } - - public String getTargetMetastoreKerberosPrincipal() { - return context.get(HiveDRArgs.TARGET_HIVE_METASTORE_KERBEROS_PRINCIPAL); - } - public String getTargetHive2KerberosPrincipal() { - return context.get(HiveDRArgs.TARGET_HIVE2_KERBEROS_PRINCIPAL); - } - - public String getTargetStagingPath() throws HiveReplicationException { - if (StringUtils.isNotEmpty(context.get(HiveDRArgs.TARGET_STAGING_PATH))) { - return context.get(HiveDRArgs.TARGET_STAGING_PATH) + File.separator + getJobName(); - } - throw new HiveReplicationException("Target StagingPath cannot be empty"); - } - - public String getReplicationMaxMaps() { - return context.get(HiveDRArgs.REPLICATION_MAX_MAPS); - } - - public String getJobName() { - return context.get(HiveDRArgs.JOB_NAME); - } - - public int getMaxEvents() { - return Integer.valueOf(context.get(HiveDRArgs.MAX_EVENTS)); - } - - public boolean shouldKeepHistory() { - return Boolean.valueOf(context.get(HiveDRArgs.KEEP_HISTORY)); - } - - public String getJobClusterWriteEP() { - return context.get(HiveDRArgs.JOB_CLUSTER_NN); - } - - public String getJobClusterNNPrincipal() { - return context.get(HiveDRArgs.JOB_CLUSTER_NN_KERBEROS_PRINCIPAL); - } - - public void setSourceStagingDir(String path) { - context.put(HiveDRArgs.SOURCE_STAGING_PATH, path); - } - - public void setTargetStagingDir(String path) { - context.put(HiveDRArgs.TARGET_STAGING_PATH, path); - } - - public String getExecutionStage() { - return context.get(HiveDRArgs.EXECUTION_STAGE); - } - - public boolean shouldBlock() { - return true; - } - - public static HiveDROptions create(String[] args) throws ParseException { - Map<HiveDRArgs, String> options = new HashMap<HiveDRArgs, String>(); - - CommandLine cmd = getCommand(args); - for (HiveDRArgs arg : HiveDRArgs.values()) { - String optionValue = arg.getOptionValue(cmd); - if (StringUtils.isNotEmpty(optionValue)) { - options.put(arg, optionValue); - } - } - - return new HiveDROptions(options); - } - - private static CommandLine getCommand(String[] arguments) throws ParseException { - Options options = new Options(); - - for (HiveDRArgs arg : HiveDRArgs.values()) { - addOption(options, arg, arg.isRequired()); - } - - return new GnuParser().parse(options, arguments, false); - } - - private static void addOption(Options options, HiveDRArgs arg, boolean isRequired) { - Option option = arg.getOption(); - option.setRequired(isRequired); - options.addOption(option); - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/6f5b476c/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRTool.java ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRTool.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRTool.java deleted file mode 100644 index e141800..0000000 --- a/addons/hivedr/src/main/java/org/apache/falcon/hive/HiveDRTool.java +++ /dev/null @@ -1,393 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.hive; - -import org.apache.commons.cli.ParseException; -import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.falcon.hive.exception.HiveReplicationException; -import org.apache.falcon.hive.mapreduce.CopyMapper; -import org.apache.falcon.hive.mapreduce.CopyReducer; -import org.apache.falcon.hive.util.DRStatusStore; -import org.apache.falcon.hive.util.DelimiterUtils; -import org.apache.falcon.hive.util.EventSourcerUtils; -import org.apache.falcon.hive.util.FileUtils; -import org.apache.falcon.hive.util.HiveDRStatusStore; -import org.apache.falcon.hive.util.HiveDRUtils; -import org.apache.falcon.hive.util.HiveMetastoreUtils; -import org.apache.falcon.job.JobCounters; -import org.apache.falcon.job.JobCountersHandler; -import org.apache.falcon.job.JobType; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.conf.Configured; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.permission.FsAction; -import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.JobStatus; -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; -import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; -import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; -import org.apache.hive.hcatalog.api.HCatClient; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.BufferedReader; -import java.io.File; -import java.io.IOException; -import java.io.InputStreamReader; -import java.io.OutputStream; -import java.util.HashMap; -import java.util.Map; - -/** - * DR Tool Driver. - */ -public class HiveDRTool extends Configured implements Tool { - - private static final String META_PATH_FILE_SUFFIX = ".metapath"; - - private FileSystem jobFS; - private FileSystem sourceClusterFS; - private FileSystem targetClusterFS; - - private HiveDROptions inputOptions; - private DRStatusStore drStore; - private String eventsMetaFile; - private EventSourcerUtils eventSoucerUtil; - private Configuration jobConf; - private String executionStage; - - public static final FsPermission STAGING_DIR_PERMISSION = - new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL); - - private static final Logger LOG = LoggerFactory.getLogger(HiveDRTool.class); - - public HiveDRTool() { - } - - @Override - public int run(String[] args) throws Exception { - if (args.length < 1) { - usage(); - return -1; - } - - try { - init(args); - } catch (Throwable e) { - LOG.error("Invalid arguments: ", e); - System.err.println("Invalid arguments: " + e.getMessage()); - usage(); - return -1; - } - - try { - Job job = execute(); - if ((job != null) && (inputOptions.getExecutionStage().equalsIgnoreCase( - HiveDRUtils.ExecutionStage.EXPORT.name()))) { - if ((job.getStatus().getState() == JobStatus.State.SUCCEEDED) - && (job.getConfiguration().get("counterLogDir") != null)) { - LOG.info("Obtaining job replication counters for Hive DR job"); - Path counterFile = new Path(job.getConfiguration().get("counterLogDir"), "counter.txt"); - JobCounters hiveReplicationCounters = JobCountersHandler.getCountersType( - JobType.HIVEREPLICATION.name()); - hiveReplicationCounters.obtainJobCounters(job.getConfiguration(), job, false); - hiveReplicationCounters.storeJobCounters(job.getConfiguration(), counterFile); - } - } - } catch (Exception e) { - System.err.println("Exception encountered " + e.getMessage()); - e.printStackTrace(); - LOG.error("Exception encountered, cleaning up staging dirs", e); - cleanup(); - return -1; - } - - if (inputOptions.getExecutionStage().equalsIgnoreCase(HiveDRUtils.ExecutionStage.IMPORT.name())) { - cleanup(); - } - - return 0; - } - - private void init(String[] args) throws Exception { - LOG.info("Initializing HiveDR"); - inputOptions = parseOptions(args); - LOG.info("Input Options: {}", inputOptions); - - Configuration sourceConf = FileUtils.getConfiguration(inputOptions.getSourceWriteEP(), - inputOptions.getSourceNNKerberosPrincipal()); - sourceClusterFS = FileSystem.get(sourceConf); - Configuration targetConf = FileUtils.getConfiguration(inputOptions.getTargetWriteEP(), - inputOptions.getTargetNNKerberosPrincipal()); - targetClusterFS = FileSystem.get(targetConf); - jobConf = FileUtils.getConfiguration(inputOptions.getJobClusterWriteEP(), - inputOptions.getJobClusterNNPrincipal()); - jobFS = FileSystem.get(jobConf); - - // init DR status store - drStore = new HiveDRStatusStore(targetClusterFS); - eventSoucerUtil = new EventSourcerUtils(jobConf, inputOptions.shouldKeepHistory(), inputOptions.getJobName()); - } - - private HiveDROptions parseOptions(String[] args) throws ParseException { - return HiveDROptions.create(args); - } - - public Job execute() throws Exception { - assert inputOptions != null; - assert getConf() != null; - executionStage = inputOptions.getExecutionStage(); - LOG.info("Executing Workflow stage : {}", executionStage); - if (executionStage.equalsIgnoreCase(HiveDRUtils.ExecutionStage.LASTEVENTS.name())) { - String lastEventsIdFile = getLastEvents(jobConf); - LOG.info("Last successfully replicated Event file : {}", lastEventsIdFile); - return null; - } else if (executionStage.equalsIgnoreCase(HiveDRUtils.ExecutionStage.EXPORT.name())) { - createStagingDirectory(); - eventsMetaFile = sourceEvents(); - LOG.info("Sourced Events meta file : {}", eventsMetaFile); - if (StringUtils.isEmpty(eventsMetaFile)) { - LOG.info("No events to process"); - return null; - } else { - /* - * eventsMetaFile contains the events to be processed by HiveDr. This file should be available - * for the import action as well. Persist the file at a location common to both export and import. - */ - persistEventsMetafileLocation(eventsMetaFile); - } - } else if (executionStage.equalsIgnoreCase(HiveDRUtils.ExecutionStage.IMPORT.name())) { - // read the location of eventsMetaFile from hdfs - eventsMetaFile = getEventsMetaFileLocation(); - if (StringUtils.isEmpty(eventsMetaFile)) { - LOG.info("No events to process"); - return null; - } - } else { - throw new HiveReplicationException("Invalid Execution stage : " + inputOptions.getExecutionStage()); - } - - Job job = createJob(); - job.submit(); - - String jobID = job.getJobID().toString(); - job.getConfiguration().set("HIVEDR_JOB_ID", jobID); - - LOG.info("HiveDR job-id: {}", jobID); - if (inputOptions.shouldBlock() && !job.waitForCompletion(true)) { - throw new IOException("HiveDR failure: Job " + jobID + " has failed: " - + job.getStatus().getFailureInfo()); - } - - return job; - } - - private Job createJob() throws Exception { - String jobName = "hive-dr" + executionStage; - String userChosenName = getConf().get(JobContext.JOB_NAME); - if (userChosenName != null) { - jobName += ": " + userChosenName; - } - Job job = Job.getInstance(getConf()); - job.setJobName(jobName); - job.setJarByClass(CopyMapper.class); - job.setMapperClass(CopyMapper.class); - job.setReducerClass(CopyReducer.class); - job.setInputFormatClass(org.apache.hadoop.mapreduce.lib.input.NLineInputFormat.class); - - job.setOutputFormatClass(NullOutputFormat.class); - job.setMapOutputKeyClass(Text.class); - job.setMapOutputValueClass(Text.class); - - job.getConfiguration().set(JobContext.MAP_SPECULATIVE, "false"); - job.getConfiguration().set(JobContext.NUM_MAPS, - String.valueOf(inputOptions.getReplicationMaxMaps())); - - for (HiveDRArgs args : HiveDRArgs.values()) { - if (inputOptions.getValue(args) != null) { - job.getConfiguration().set(args.getName(), inputOptions.getValue(args)); - } else { - job.getConfiguration().set(args.getName(), "null"); - } - } - job.getConfiguration().set(FileInputFormat.INPUT_DIR, eventsMetaFile); - - return job; - } - - private void createStagingDirectory() throws IOException, HiveReplicationException { - Path sourceStagingPath = new Path(inputOptions.getSourceStagingPath()); - Path targetStagingPath = new Path(inputOptions.getTargetStagingPath()); - LOG.info("Source staging path: {}", sourceStagingPath); - if (!FileSystem.mkdirs(sourceClusterFS, sourceStagingPath, STAGING_DIR_PERMISSION)) { - throw new IOException("mkdir failed for " + sourceStagingPath); - } - - LOG.info("Target staging path: {}", targetStagingPath); - if (!FileSystem.mkdirs(targetClusterFS, targetStagingPath, STAGING_DIR_PERMISSION)) { - throw new IOException("mkdir failed for " + targetStagingPath); - } - } - - private void cleanStagingDirectory() throws HiveReplicationException { - LOG.info("Cleaning staging directories"); - Path sourceStagingPath = new Path(inputOptions.getSourceStagingPath()); - Path targetStagingPath = new Path(inputOptions.getTargetStagingPath()); - try { - if (sourceClusterFS.exists(sourceStagingPath)) { - sourceClusterFS.delete(sourceStagingPath, true); - } - - if (targetClusterFS.exists(targetStagingPath)) { - targetClusterFS.delete(targetStagingPath, true); - } - } catch (IOException e) { - LOG.error("Unable to cleanup staging dir:", e); - } - } - - private String sourceEvents() throws Exception { - MetaStoreEventSourcer defaultSourcer = null; - String inputFilename = null; - String lastEventsIdFile = FileUtils.DEFAULT_EVENT_STORE_PATH +File.separator+inputOptions.getJobName()+"/" - +inputOptions.getJobName()+".id"; - Map<String, Long> lastEventsIdMap = getLastDBTableEvents(new Path(lastEventsIdFile)); - try { - HCatClient sourceMetastoreClient = HiveMetastoreUtils.initializeHiveMetaStoreClient( - inputOptions.getSourceMetastoreUri(), - inputOptions.getSourceMetastoreKerberosPrincipal(), - inputOptions.getSourceHive2KerberosPrincipal()); - defaultSourcer = new MetaStoreEventSourcer(sourceMetastoreClient, - new DefaultPartitioner(drStore, eventSoucerUtil), eventSoucerUtil, lastEventsIdMap); - inputFilename = defaultSourcer.sourceEvents(inputOptions); - } finally { - if (defaultSourcer != null) { - defaultSourcer.cleanUp(); - } - } - - return inputFilename; - } - - private String getLastEvents(Configuration conf) throws Exception { - LastReplicatedEvents lastEvents = new LastReplicatedEvents(conf, - inputOptions.getTargetMetastoreUri(), - inputOptions.getTargetMetastoreKerberosPrincipal(), - inputOptions.getTargetHive2KerberosPrincipal(), - drStore, inputOptions); - String eventIdFile = lastEvents.getLastEvents(inputOptions); - lastEvents.cleanUp(); - return eventIdFile; - } - - private Map<String, Long> getLastDBTableEvents(Path lastEventIdFile) throws Exception { - Map<String, Long> lastEventsIdMap = new HashMap<String, Long>(); - BufferedReader in = new BufferedReader(new InputStreamReader(jobFS.open(lastEventIdFile))); - try { - String line; - while ((line=in.readLine())!=null) { - String[] field = line.trim().split(DelimiterUtils.TAB_DELIM, -1); - lastEventsIdMap.put(field[0], Long.parseLong(field[1])); - } - } catch (Exception e) { - throw new IOException(e); - } finally { - IOUtils.closeQuietly(in); - } - - return lastEventsIdMap; - } - - public static void main(String[] args) { - int exitCode; - try { - HiveDRTool hiveDRTool = new HiveDRTool(); - exitCode = ToolRunner.run(HiveDRUtils.getDefaultConf(), hiveDRTool, args); - } catch (Exception e) { - LOG.error("Couldn't complete HiveDR operation: ", e); - exitCode = -1; - } - - System.exit(exitCode); - } - - private void cleanInputDir() { - eventSoucerUtil.cleanUpEventInputDir(); - } - - private synchronized void cleanup() throws HiveReplicationException { - cleanStagingDirectory(); - cleanInputDir(); - cleanTempFiles(); - } - - private void cleanTempFiles() { - Path eventsDirPath = new Path(FileUtils.DEFAULT_EVENT_STORE_PATH, inputOptions.getJobName()); - Path metaFilePath = new Path(eventsDirPath.toString(), inputOptions.getJobName() + META_PATH_FILE_SUFFIX); - Path eventsFilePath = new Path(eventsDirPath.toString(), inputOptions.getJobName() + ".id"); - - try { - if (jobFS.exists(metaFilePath)) { - jobFS.delete(metaFilePath, true); - } - if (jobFS.exists(eventsFilePath)) { - jobFS.delete(eventsFilePath, true); - } - } catch (IOException e) { - LOG.error("Deleting Temp files failed", e); - } - } - - public void persistEventsMetafileLocation(final String eventMetaFilePath) throws IOException { - Path eventsDirPath = new Path(FileUtils.DEFAULT_EVENT_STORE_PATH, inputOptions.getJobName()); - Path metaFilePath = new Path(eventsDirPath.toString(), inputOptions.getJobName() + META_PATH_FILE_SUFFIX); - - OutputStream out = null; - try { - out = FileSystem.create(jobFS, metaFilePath, FileUtils.FS_PERMISSION_700); - out.write(eventMetaFilePath.getBytes()); - out.flush(); - } finally { - IOUtils.closeQuietly(out); - } - } - - private String getEventsMetaFileLocation() throws IOException { - Path eventsDirPath = new Path(FileUtils.DEFAULT_EVENT_STORE_PATH, inputOptions.getJobName()); - Path metaFilePath = new Path(eventsDirPath.toString(), inputOptions.getJobName() + META_PATH_FILE_SUFFIX); - String line = null; - if (jobFS.exists(metaFilePath)) { - BufferedReader in = new BufferedReader(new InputStreamReader(jobFS.open(metaFilePath))); - line = in.readLine(); - in.close(); - } - return line; - } - - - public static void usage() { - System.out.println("Usage: hivedrtool -option value ...."); - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/6f5b476c/addons/hivedr/src/main/java/org/apache/falcon/hive/LastReplicatedEvents.java ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/LastReplicatedEvents.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/LastReplicatedEvents.java deleted file mode 100644 index bae6c9e..0000000 --- a/addons/hivedr/src/main/java/org/apache/falcon/hive/LastReplicatedEvents.java +++ /dev/null @@ -1,196 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.hive; - -import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.falcon.hive.exception.HiveReplicationException; -import org.apache.falcon.hive.util.DRStatusStore; -import org.apache.falcon.hive.util.DelimiterUtils; -import org.apache.falcon.hive.util.FileUtils; -import org.apache.falcon.hive.util.HiveDRUtils; -import org.apache.falcon.hive.util.HiveMetastoreUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hive.hcatalog.api.HCatClient; -import org.apache.hive.hcatalog.api.HCatTable; -import org.apache.hive.hcatalog.api.repl.ReplicationUtils; -import org.apache.hive.hcatalog.common.HCatException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.io.OutputStream; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** - * Sources meta store change events from Hive. - */ -public class LastReplicatedEvents { - private static final Logger LOG = LoggerFactory.getLogger(LastReplicatedEvents.class); - private final HCatClient targetMetastoreClient; - private final DRStatusStore drStore; - private final FileSystem jobFS; - private Path eventsInputDirPath; - - /* TODO handle cases when no events. files will be empty and lists will be empty */ - public LastReplicatedEvents(Configuration conf, String targetMetastoreUri, - String targetMetastoreKerberosPrincipal, - String targetHive2KerberosPrincipal, - DRStatusStore drStore, HiveDROptions inputOptions) throws Exception { - targetMetastoreClient = HiveMetastoreUtils.initializeHiveMetaStoreClient(targetMetastoreUri, - targetMetastoreKerberosPrincipal, targetHive2KerberosPrincipal); - jobFS = FileSystem.get(conf); - this.drStore = drStore; - init(inputOptions.getJobName()); - } - - private void init(final String jobName) throws Exception { - // Create base dir to store events on cluster where job is running - Path dir = new Path(FileUtils.DEFAULT_EVENT_STORE_PATH); - // Validate base path - FileUtils.validatePath(jobFS, new Path(DRStatusStore.BASE_DEFAULT_STORE_PATH)); - - if (!jobFS.exists(dir)) { - if (!jobFS.mkdirs(dir)) { - throw new Exception("Creating directory failed: " + dir); - } - } - - eventsInputDirPath = new Path(FileUtils.DEFAULT_EVENT_STORE_PATH, jobName); - - if (!jobFS.exists(eventsInputDirPath)) { - if (!jobFS.mkdirs(eventsInputDirPath)) { - throw new Exception("Creating directory failed: " + eventsInputDirPath); - } - } - } - - public String getLastEvents(HiveDROptions inputOptions) throws Exception { - HiveDRUtils.ReplicationType replicationType = HiveDRUtils.getReplicationType(inputOptions.getSourceTables()); - LOG.info("Obtaining last events for replicationType : {}", replicationType); - HashMap<String, Long> lastEvents = new HashMap<String, Long>(); - if (replicationType == HiveDRUtils.ReplicationType.DB) { - List<String> dbNames = inputOptions.getSourceDatabases(); - for (String db : dbNames) { - lastEvents.put(db, getLastSavedEventId(inputOptions, db, null)); - } - } else { - List<String> tableNames = inputOptions.getSourceTables(); - String db = inputOptions.getSourceDatabases().get(0); - for (String tableName : tableNames) { - lastEvents.put(db + "." + tableName, getLastSavedEventId(inputOptions, db, tableName)); - } - } - - return persistLastEventsToFile(lastEvents, inputOptions.getJobName()); - } - - private long getLastSavedEventId(HiveDROptions inputOptions, final String dbName, - final String tableName) throws Exception { - HiveDRUtils.ReplicationType replicationType = HiveDRUtils.getReplicationType(inputOptions.getSourceTables()); - String jobName = inputOptions.getJobName(); - String sourceMetastoreUri = inputOptions.getSourceMetastoreUri(); - String targetMetastoreUri = inputOptions.getTargetMetastoreUri(); - - long eventId = 0; - if (HiveDRUtils.ReplicationType.DB == replicationType) { - eventId = drStore.getReplicationStatus(sourceMetastoreUri, targetMetastoreUri, - jobName, dbName).getEventId(); - } else if (HiveDRUtils.ReplicationType.TABLE == replicationType) { - eventId = drStore.getReplicationStatus(sourceMetastoreUri, targetMetastoreUri, - jobName, dbName, tableName).getEventId(); - } - - if (eventId == -1) { - if (HiveDRUtils.ReplicationType.DB == replicationType) { - /* - * API to get last repl ID for a DB is very expensive, so Hive does not want to make it public. - * HiveDrTool finds last repl id for DB by finding min last repl id of all tables. - */ - eventId = getLastReplicationIdForDatabase(dbName); - } else { - HCatTable table = targetMetastoreClient.getTable(dbName, tableName); - eventId = ReplicationUtils.getLastReplicationId(table); - } - } - - if ((StringUtils.isEmpty(tableName))) { - LOG.info("Last replicated eventId for DB : {} is {}", dbName, eventId); - } else { - LOG.info("Last replicated eventId for DB : {} Table : {} is {}", dbName, tableName, eventId); - } - - return eventId; - } - - private long getLastReplicationIdForDatabase(String databaseName) throws HiveReplicationException { - /* - * This is a very expensive method and should only be called during first dbReplication instance. - */ - long eventId = Long.MAX_VALUE; - try { - List<String> tableList = targetMetastoreClient.listTableNamesByPattern(databaseName, "*"); - for (String tableName : tableList) { - long temp = ReplicationUtils.getLastReplicationId( - targetMetastoreClient.getTable(databaseName, tableName)); - if (temp < eventId) { - eventId = temp; - } - } - return (eventId == Long.MAX_VALUE) ? 0 : eventId; - } catch (HCatException e) { - throw new HiveReplicationException("Unable to find last replication id for database " - + databaseName, e); - } - } - - public String persistLastEventsToFile(final HashMap<String, Long> lastEvents, - final String identifier) throws IOException { - if (lastEvents.size()!=0) { - Path eventsFile = new Path(eventsInputDirPath.toString(), identifier+".id"); - OutputStream out = null; - - try { - out = FileSystem.create(jobFS, eventsFile, FileUtils.FS_PERMISSION_700); - for (Map.Entry<String, Long> entry : lastEvents.entrySet()) { - out.write(entry.getKey().getBytes()); - out.write(DelimiterUtils.TAB_DELIM.getBytes()); - out.write(String.valueOf(entry.getValue()).getBytes()); - out.write(DelimiterUtils.NEWLINE_DELIM.getBytes()); - } - out.flush(); - } finally { - IOUtils.closeQuietly(out); - } - return jobFS.makeQualified(eventsFile).toString(); - } else { - return null; - } - } - - public void cleanUp() throws Exception { - if (targetMetastoreClient != null) { - targetMetastoreClient.close(); - } - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/6f5b476c/addons/hivedr/src/main/java/org/apache/falcon/hive/MetaStoreEventSourcer.java ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/MetaStoreEventSourcer.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/MetaStoreEventSourcer.java deleted file mode 100644 index f008883..0000000 --- a/addons/hivedr/src/main/java/org/apache/falcon/hive/MetaStoreEventSourcer.java +++ /dev/null @@ -1,204 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.hive; - -import org.apache.commons.lang3.StringUtils; -import org.apache.falcon.hive.util.EventSourcerUtils; -import org.apache.falcon.hive.util.HiveDRUtils; -import org.apache.hive.hcatalog.api.HCatClient; -import org.apache.hive.hcatalog.api.repl.ReplicationTask; -import org.apache.hive.hcatalog.api.repl.StagingDirectoryProvider; -import org.apache.hive.hcatalog.common.HCatException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.OutputStream; -import java.util.Iterator; -import java.util.List; -import java.util.Map; - -/** - * Sources meta store change events from Hive. - */ -public class MetaStoreEventSourcer implements EventSourcer { - - private static final Logger LOG = LoggerFactory.getLogger(MetaStoreEventSourcer.class); - - private final HCatClient sourceMetastoreClient; - private final Partitioner partitioner; - private final EventSourcerUtils eventSourcerUtils; - private final ReplicationEventMetadata eventMetadata; - private Map<String, Long> lastEventsIdMap = null; - private long lastCounter; - - /* TODO handle cases when no events. files will be empty and lists will be empty */ - public MetaStoreEventSourcer(HCatClient sourceMetastoreClient, Partitioner partitioner, - EventSourcerUtils eventSourcerUtils, - Map<String, Long> lastEventsIdMap) throws Exception { - this.sourceMetastoreClient = sourceMetastoreClient; - this.eventMetadata = new ReplicationEventMetadata(); - this.partitioner = partitioner; - this.eventSourcerUtils = eventSourcerUtils; - this.lastEventsIdMap = lastEventsIdMap; - } - - public String sourceEvents(HiveDROptions inputOptions) throws Exception { - HiveDRUtils.ReplicationType replicationType = HiveDRUtils.getReplicationType(inputOptions.getSourceTables()); - LOG.info("Sourcing replication events for type : {}", replicationType); - if (replicationType == HiveDRUtils.ReplicationType.DB) { - List<String> dbNames = inputOptions.getSourceDatabases(); - for (String db : dbNames) { - ++lastCounter; - sourceEventsForDb(inputOptions, db); - } - } else { - List<String> tableNames = inputOptions.getSourceTables(); - String db = inputOptions.getSourceDatabases().get(0); - for (String tableName : tableNames) { - ++lastCounter; - sourceEventsForTable(inputOptions, db, tableName); - } - } - - if (eventMetadata.getEventFileMetadata() == null || eventMetadata.getEventFileMetadata().isEmpty()) { - LOG.info("No events for tables for the request db: {} , Tables : {}", inputOptions.getSourceDatabases(), - inputOptions.getSourceTables()); - eventSourcerUtils.cleanUpEventInputDir(); - return null; - } else { - return eventSourcerUtils.persistToMetaFile(eventMetadata, inputOptions.getJobName()); - } - } - - private void sourceEventsForDb(HiveDROptions inputOptions, String dbName) throws Exception { - Iterator<ReplicationTask> replicationTaskIter = sourceReplicationEvents(getLastSavedEventId(dbName, null), - inputOptions.getMaxEvents(), dbName, null); - if (replicationTaskIter == null || !replicationTaskIter.hasNext()) { - LOG.info("No events for db: {}", dbName); - return; - } - processEvents(dbName, null, inputOptions, replicationTaskIter); - } - - private void sourceEventsForTable(HiveDROptions inputOptions, String dbName, String tableName) - throws Exception { - Iterator<ReplicationTask> replicationTaskIter = sourceReplicationEvents(getLastSavedEventId(dbName, tableName), - inputOptions.getMaxEvents(), dbName, tableName - ); - if (replicationTaskIter == null || !replicationTaskIter.hasNext()) { - LOG.info("No events for db.table: {}.{}", dbName, tableName); - return; - } - processEvents(dbName, tableName, inputOptions, replicationTaskIter); - } - - private void processEvents(String dbName, String tableName, HiveDROptions inputOptions, - Iterator<ReplicationTask> replicationTaskIter) throws Exception { - if (partitioner.isPartitioningRequired(inputOptions)) { - ReplicationEventMetadata dbEventMetadata = partitioner.partition(inputOptions, dbName, replicationTaskIter); - - if (dbEventMetadata == null || dbEventMetadata.getEventFileMetadata() == null - || dbEventMetadata.getEventFileMetadata().isEmpty()) { - LOG.info("No events for db: {} , Table : {}", dbName, tableName); - } else { - EventSourcerUtils.updateEventMetadata(eventMetadata, dbEventMetadata); - } - } else { - processTableReplicationEvents(replicationTaskIter, dbName, tableName, - inputOptions.getSourceStagingPath(), inputOptions.getTargetStagingPath()); - } - } - - private long getLastSavedEventId(final String dbName, final String tableName) throws Exception { - String key = dbName; - if (StringUtils.isNotEmpty(tableName)) { - key = dbName + "." + tableName; - } - long eventId = lastEventsIdMap.get(key); - LOG.info("LastSavedEventId eventId for {} : {}", key, eventId); - return eventId; - } - - private Iterator<ReplicationTask> sourceReplicationEvents(long lastEventId, int maxEvents, String dbName, - String tableName) throws Exception { - try { - return sourceMetastoreClient.getReplicationTasks(lastEventId, maxEvents, dbName, tableName); - } catch (HCatException e) { - throw new Exception("Exception getting replication events " + e.getMessage(), e); - } - } - - - protected void processTableReplicationEvents(Iterator<ReplicationTask> taskIter, String dbName, - String tableName, String srcStagingDirProvider, - String dstStagingDirProvider) throws Exception { - String srcFilename = null; - String tgtFilename = null; - OutputStream srcOutputStream = null; - OutputStream tgtOutputStream = null; - - while (taskIter.hasNext()) { - ReplicationTask task = taskIter.next(); - if (task.needsStagingDirs()) { - task.withSrcStagingDirProvider(new StagingDirectoryProvider.TrivialImpl(srcStagingDirProvider, - HiveDRUtils.SEPARATOR)); - task.withDstStagingDirProvider(new StagingDirectoryProvider.TrivialImpl(dstStagingDirProvider, - HiveDRUtils.SEPARATOR)); - } - - if (task.isActionable()) { - Iterable<? extends org.apache.hive.hcatalog.api.repl.Command> srcCmds = task.getSrcWhCommands(); - if (srcCmds != null) { - if (StringUtils.isEmpty(srcFilename)) { - srcFilename = eventSourcerUtils.getSrcFileName(String.valueOf(lastCounter)).toString(); - srcOutputStream = eventSourcerUtils.getFileOutputStream(srcFilename); - } - eventSourcerUtils.persistReplicationEvents(srcOutputStream, srcCmds); - } - - - Iterable<? extends org.apache.hive.hcatalog.api.repl.Command> dstCmds = task.getDstWhCommands(); - if (dstCmds != null) { - if (StringUtils.isEmpty(tgtFilename)) { - tgtFilename = eventSourcerUtils.getTargetFileName(String.valueOf(lastCounter)).toString(); - tgtOutputStream = eventSourcerUtils.getFileOutputStream(tgtFilename); - } - eventSourcerUtils.persistReplicationEvents(tgtOutputStream, dstCmds); - } - - } else { - LOG.error("Task is not actionable with event Id : {}", task.getEvent().getEventId()); - } - } - // Close the stream - eventSourcerUtils.closeOutputStream(srcOutputStream); - eventSourcerUtils.closeOutputStream(tgtOutputStream); - EventSourcerUtils.updateEventMetadata(eventMetadata, dbName, tableName, srcFilename, tgtFilename); - } - - public String persistToMetaFile(String jobName) throws Exception { - return eventSourcerUtils.persistToMetaFile(eventMetadata, jobName); - } - - public void cleanUp() throws Exception { - if (sourceMetastoreClient != null) { - sourceMetastoreClient.close(); - } - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/6f5b476c/addons/hivedr/src/main/java/org/apache/falcon/hive/Partitioner.java ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/Partitioner.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/Partitioner.java deleted file mode 100644 index 25b8bd6..0000000 --- a/addons/hivedr/src/main/java/org/apache/falcon/hive/Partitioner.java +++ /dev/null @@ -1,42 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.hive; - -import org.apache.hive.hcatalog.api.repl.ReplicationTask; - -import java.util.Iterator; - -/** - * Partition hive events. - */ -public interface Partitioner { - /** - * Partition events. - * - * @param options Hive dr options. - * @param dbName Database name. - * @param replicationTaskIterator Repl task iterator. - * @return ReplicationEventMetadata - */ - ReplicationEventMetadata partition(final HiveDROptions options, - final String dbName, - final Iterator<ReplicationTask> replicationTaskIterator) throws Exception; - - boolean isPartitioningRequired(final HiveDROptions options); -} http://git-wip-us.apache.org/repos/asf/falcon/blob/6f5b476c/addons/hivedr/src/main/java/org/apache/falcon/hive/ReplicationEventMetadata.java ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/ReplicationEventMetadata.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/ReplicationEventMetadata.java deleted file mode 100644 index 79e0ded..0000000 --- a/addons/hivedr/src/main/java/org/apache/falcon/hive/ReplicationEventMetadata.java +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.hive; - -import java.util.Map; -import java.util.HashMap; - -/** - * Replication event meta data class. - */ -public class ReplicationEventMetadata { - - private Map<String, String> eventFileMetadata = new HashMap<>(); - - public Map<String, String> getEventFileMetadata() { - return eventFileMetadata; - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/6f5b476c/addons/hivedr/src/main/java/org/apache/falcon/hive/exception/HiveReplicationException.java ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/exception/HiveReplicationException.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/exception/HiveReplicationException.java deleted file mode 100644 index 0baf6d8..0000000 --- a/addons/hivedr/src/main/java/org/apache/falcon/hive/exception/HiveReplicationException.java +++ /dev/null @@ -1,49 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.hive.exception; - -/** - * Wrapper class for HiveReplication exceptions. - */ -public class HiveReplicationException extends Exception { - - /** - * @param e Exception - */ - public HiveReplicationException(Throwable e) { - super(e); - } - - public HiveReplicationException(String message, Throwable e) { - super(message, e); - } - - /** - * @param message - custom exception message - */ - public HiveReplicationException(String message) { - super(message); - } - - /** - * - */ - private static final long serialVersionUID = -1475818869309247014L; - -} http://git-wip-us.apache.org/repos/asf/falcon/blob/6f5b476c/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyCommitter.java ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyCommitter.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyCommitter.java deleted file mode 100644 index 98449f0..0000000 --- a/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyCommitter.java +++ /dev/null @@ -1,65 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.hive.mapreduce; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; - -/** - * Copy committer class. - */ -public class CopyCommitter extends FileOutputCommitter { - - private static final Logger LOG = LoggerFactory.getLogger(CopyCommitter.class); - - /** - * Create a file output committer. - * - * @param outputPath the job's output path, or null if you want the output - * committer to act as a noop. - * @param context the task's context - * @throws java.io.IOException - */ - public CopyCommitter(Path outputPath, - TaskAttemptContext context) throws IOException { - super(outputPath, context); - } - - @Override - public void commitJob(JobContext jobContext) throws IOException { - Configuration conf = jobContext.getConfiguration(); - - try { - super.commitJob(jobContext); - } finally { - cleanup(conf); - } - } - - private void cleanup(Configuration conf) { - // clean up staging and other data - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/6f5b476c/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyMapper.java ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyMapper.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyMapper.java deleted file mode 100644 index 08e0551..0000000 --- a/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyMapper.java +++ /dev/null @@ -1,104 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.hive.mapreduce; - -import org.apache.falcon.hive.HiveDRArgs; -import org.apache.falcon.hive.util.EventUtils; -import org.apache.falcon.hive.util.HiveDRUtils; -import org.apache.falcon.hive.util.ReplicationStatus; -import org.apache.falcon.job.ReplicationJobCountersList; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Mapper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.sql.SQLException; -import java.util.List; - -/** - * Map class for Hive DR. - */ -public class CopyMapper extends Mapper<LongWritable, Text, Text, Text> { - - private static final Logger LOG = LoggerFactory.getLogger(CopyMapper.class); - private EventUtils eventUtils; - - @Override - protected void setup(Context context) throws IOException, InterruptedException { - eventUtils = new EventUtils(context.getConfiguration()); - eventUtils.initializeFS(); - try { - eventUtils.setupConnection(); - } catch (Exception e) { - throw new IOException(e); - } - } - - @Override - protected void map(LongWritable key, Text value, - Context context) throws IOException, InterruptedException { - LOG.debug("Processing Event value: {}", value.toString()); - - try { - eventUtils.processEvents(value.toString()); - } catch (Exception e) { - LOG.error("Exception in processing events:", e); - throw new IOException(e); - } finally { - cleanup(context); - } - List<ReplicationStatus> replicationStatusList = eventUtils.getListReplicationStatus(); - if (replicationStatusList != null && !replicationStatusList.isEmpty()) { - for (ReplicationStatus rs : replicationStatusList) { - context.write(new Text(rs.getJobName()), new Text(rs.toString())); - } - } - - // In case of export stage, populate custom counters - if (context.getConfiguration().get(HiveDRArgs.EXECUTION_STAGE.getName()) - .equalsIgnoreCase(HiveDRUtils.ExecutionStage.EXPORT.name()) - && !eventUtils.isCountersMapEmtpy()) { - context.getCounter(ReplicationJobCountersList.BYTESCOPIED).increment( - eventUtils.getCounterValue(ReplicationJobCountersList.BYTESCOPIED.getName())); - context.getCounter(ReplicationJobCountersList.COPY).increment( - eventUtils.getCounterValue(ReplicationJobCountersList.COPY.getName())); - } - } - - protected void cleanup(Context context) throws IOException, InterruptedException { - LOG.info("Invoking cleanup process"); - super.cleanup(context); - try { - if (context.getConfiguration().get(HiveDRArgs.EXECUTION_STAGE.getName()) - .equalsIgnoreCase(HiveDRUtils.ExecutionStage.IMPORT.name())) { - eventUtils.cleanEventsDirectory(); - } - } catch (IOException e) { - LOG.error("Cleaning up of events directories failed", e); - } finally { - try { - eventUtils.closeConnection(); - } catch (SQLException e) { - LOG.error("Closing the connections failed", e); - } - } - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/6f5b476c/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyReducer.java ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyReducer.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyReducer.java deleted file mode 100644 index 50cb4b2..0000000 --- a/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyReducer.java +++ /dev/null @@ -1,85 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.hive.mapreduce; - - -import org.apache.falcon.hive.HiveDRArgs; -import org.apache.falcon.hive.exception.HiveReplicationException; -import org.apache.falcon.hive.util.DRStatusStore; -import org.apache.falcon.hive.util.FileUtils; -import org.apache.falcon.hive.util.HiveDRStatusStore; -import org.apache.falcon.hive.util.ReplicationStatus; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapreduce.Reducer; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.List; - -/** - * Reducer class for Hive DR. - */ -public class CopyReducer extends Reducer<Text, Text, Text, Text> { - private DRStatusStore hiveDRStore; - - @Override - protected void setup(Context context) throws IOException, InterruptedException { - Configuration conf = context.getConfiguration(); - FileSystem fs= FileSystem.get(FileUtils.getConfiguration( - conf.get(HiveDRArgs.TARGET_NN.getName()), - conf.get(HiveDRArgs.TARGET_NN_KERBEROS_PRINCIPAL.getName()))); - hiveDRStore = new HiveDRStatusStore(fs); - } - - private List<ReplicationStatus> sortStatusList(List<ReplicationStatus> replStatusList) { - Collections.sort(replStatusList, new Comparator<ReplicationStatus>() { - @Override - public int compare(ReplicationStatus r1, ReplicationStatus r2) { - return (int) (r1.getEventId() - r2.getEventId()); - } - }); - return replStatusList; - } - - @Override - protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { - List<ReplicationStatus> replStatusList = new ArrayList<ReplicationStatus>(); - ReplicationStatus rs; - try { - for (Text value : values) { - String[] fields = (value.toString()).split("\t"); - rs = new ReplicationStatus(fields[0], fields[1], fields[2], fields[3], fields[4], - ReplicationStatus.Status.valueOf(fields[5]), Long.parseLong(fields[6])); - replStatusList.add(rs); - } - - hiveDRStore.updateReplicationStatus(key.toString(), sortStatusList(replStatusList)); - } catch (HiveReplicationException e) { - throw new IOException(e); - } - } - - @Override - protected void cleanup(Context context) throws IOException, InterruptedException { - } -}
