http://git-wip-us.apache.org/repos/asf/falcon/blob/6f5b476c/addons/hivedr/src/main/java/org/apache/falcon/hive/util/DBReplicationStatus.java ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/DBReplicationStatus.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/DBReplicationStatus.java deleted file mode 100644 index 6dceb8e..0000000 --- a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/DBReplicationStatus.java +++ /dev/null @@ -1,213 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.hive.util; - -import org.apache.commons.lang3.StringUtils; -import org.apache.falcon.hive.exception.HiveReplicationException; -import org.codehaus.jettison.json.JSONException; -import org.codehaus.jettison.json.JSONObject; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; - -/** - * Class to store replication status of a DB and it's tables. - */ -public class DBReplicationStatus { - - private static final Logger LOG = LoggerFactory.getLogger(DBReplicationStatus.class); - private static final String DB_STATUS = "db_status"; - private static final String TABLE_STATUS = "table_status"; - - private Map<String, ReplicationStatus> tableStatuses = new HashMap<String, ReplicationStatus>(); - private ReplicationStatus databaseStatus; - - public DBReplicationStatus(ReplicationStatus dbStatus) throws HiveReplicationException { - setDatabaseStatus(dbStatus); - } - - public DBReplicationStatus(ReplicationStatus dbStatus, - Map<String, ReplicationStatus> tableStatuses) throws HiveReplicationException { - /* - The order of set method calls is important to ensure tables that do not belong to same db - are not added to this DBReplicationStatus - */ - setDatabaseStatus(dbStatus); - setTableStatuses(tableStatuses); - } - - // Serialize - public String toJsonString() throws HiveReplicationException { - JSONObject retObject = new JSONObject(); - JSONObject tableStatus = new JSONObject(); - try { - for (Map.Entry<String, ReplicationStatus> status : tableStatuses.entrySet()) { - tableStatus.put(status.getKey(), status.getValue().toJsonObject()); - } - retObject.put(DB_STATUS, databaseStatus.toJsonObject()); - retObject.put(TABLE_STATUS, tableStatus); - return retObject.toString(ReplicationStatus.INDENT_FACTOR); - } catch (JSONException e) { - throw new HiveReplicationException("Unable to serialize Database Replication Status", e); - } - } - - // de-serialize - public DBReplicationStatus(String jsonString) throws HiveReplicationException { - try { - JSONObject object = new JSONObject(jsonString); - ReplicationStatus dbstatus = new ReplicationStatus(object.get(DB_STATUS).toString()); - setDatabaseStatus(dbstatus); - - JSONObject tableJson = object.getJSONObject(TABLE_STATUS); - Iterator keys = tableJson.keys(); - while(keys.hasNext()) { - String key = keys.next().toString(); - ReplicationStatus value = new ReplicationStatus(tableJson.get(key).toString()); - if (value.getDatabase().equals(dbstatus.getDatabase())) { - tableStatuses.put(key.toLowerCase(), value); - } else { - throw new HiveReplicationException("Unable to create DBReplicationStatus from JsonString. " - + "Cannot set status for table " + value.getDatabase() + "." + value.getTable() - + ", It does not belong to DB " + dbstatus.getDatabase()); - } - } - } catch (JSONException e) { - throw new HiveReplicationException("Unable to create DBReplicationStatus from JsonString", e); - } - } - - public Map<String, ReplicationStatus> getTableStatuses() { - return tableStatuses; - } - - public ReplicationStatus getTableStatus(String tableName) throws HiveReplicationException { - tableName = tableName.toLowerCase(); - if (tableStatuses.containsKey(tableName)) { - return tableStatuses.get(tableName); - } - return new ReplicationStatus(databaseStatus.getSourceUri(), databaseStatus.getTargetUri(), - databaseStatus.getJobName(), databaseStatus.getDatabase(), - tableName, ReplicationStatus.Status.INIT, -1); - } - - public Iterator<ReplicationStatus> getTableStatusIterator() { - List<ReplicationStatus> resultSet = new ArrayList<ReplicationStatus>(); - for (Map.Entry<String, ReplicationStatus> entry : tableStatuses.entrySet()) { - resultSet.add(entry.getValue()); - } - return resultSet.iterator(); - } - - private void setTableStatuses(Map<String, ReplicationStatus> tableStatuses) throws HiveReplicationException { - for (Map.Entry<String, ReplicationStatus> entry : tableStatuses.entrySet()) { - if (!entry.getValue().getDatabase().equals(databaseStatus.getDatabase())) { - throw new HiveReplicationException("Cannot set status for table " + entry.getValue().getDatabase() - + "." + entry.getValue().getTable() + ", It does not belong to DB " - + databaseStatus.getDatabase()); - } else { - this.tableStatuses.put(entry.getKey().toLowerCase(), entry.getValue()); - } - } - } - - public ReplicationStatus getDatabaseStatus() { - return databaseStatus; - } - - private void setDatabaseStatus(ReplicationStatus databaseStatus) { - this.databaseStatus = databaseStatus; - } - - /** - * Update DB status from table statuses. - case 1) All tables replicated successfully. - Take the largest successful eventId and set dbReplStatus as success - case 2) One or many tables failed to replicate - Take the smallest eventId amongst the failed tables and set dbReplStatus as failed. - */ - public void updateDbStatusFromTableStatuses() throws HiveReplicationException { - if (tableStatuses.size() == 0) { - // nothing to do - return; - } - - databaseStatus.setStatus(ReplicationStatus.Status.SUCCESS); - long successEventId = databaseStatus.getEventId(); - long failedEventId = -1; - - for (Map.Entry<String, ReplicationStatus> entry : tableStatuses.entrySet()) { - long eventId = entry.getValue().getEventId(); - if (entry.getValue().getStatus().equals(ReplicationStatus.Status.SUCCESS)) { - if (eventId > successEventId) { - successEventId = eventId; - } - } else if (entry.getValue().getStatus().equals(ReplicationStatus.Status.FAILURE)) { - databaseStatus.setStatus(ReplicationStatus.Status.FAILURE); - if (eventId < failedEventId || failedEventId == -1) { - failedEventId = eventId; - } - } //else , if table status is Status.INIT, it should not change lastEventId of DB - } - - String log = "Updating DB Status based on table replication status. Status : " - + databaseStatus.getStatus().toString() + ", eventId : "; - if (databaseStatus.getStatus().equals(ReplicationStatus.Status.SUCCESS)) { - databaseStatus.setEventId(successEventId); - LOG.info(log + String.valueOf(successEventId)); - } else if (databaseStatus.getStatus().equals(ReplicationStatus.Status.FAILURE)) { - databaseStatus.setEventId(failedEventId); - LOG.error(log + String.valueOf(failedEventId)); - } - - } - - public void updateDbStatus(ReplicationStatus status) throws HiveReplicationException { - if (StringUtils.isNotEmpty(status.getTable())) { - throw new HiveReplicationException("Cannot update DB Status. This is table level status."); - } - - if (this.databaseStatus.getDatabase().equals(status.getDatabase())) { - this.databaseStatus = status; - } else { - throw new HiveReplicationException("Cannot update Database Status. StatusDB " - + status.getDatabase() + " does not match current DB " - + this.databaseStatus.getDatabase()); - } - } - - public void updateTableStatus(ReplicationStatus status) throws HiveReplicationException { - if (StringUtils.isEmpty(status.getTable())) { - throw new HiveReplicationException("Cannot update Table Status. Table name is empty."); - } - - if (this.databaseStatus.getDatabase().equals(status.getDatabase())) { - this.tableStatuses.put(status.getTable(), status); - } else { - throw new HiveReplicationException("Cannot update Table Status. TableDB " - + status.getDatabase() + " does not match current DB " - + this.databaseStatus.getDatabase()); - } - } -}
http://git-wip-us.apache.org/repos/asf/falcon/blob/6f5b476c/addons/hivedr/src/main/java/org/apache/falcon/hive/util/DRStatusStore.java ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/DRStatusStore.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/DRStatusStore.java deleted file mode 100644 index cf6b7ad..0000000 --- a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/DRStatusStore.java +++ /dev/null @@ -1,104 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.hive.util; - -import org.apache.falcon.hive.exception.HiveReplicationException; -import org.apache.hadoop.fs.permission.FsAction; -import org.apache.hadoop.fs.permission.FsPermission; - -import java.util.Iterator; -import java.util.List; - -/** - * Abstract class for Data Replication Status Store. - */ -public abstract class DRStatusStore { - - public static final String BASE_DEFAULT_STORE_PATH = "/apps/data-mirroring/"; - public static final FsPermission DEFAULT_STORE_PERMISSION = - new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.NONE); - - private static String storeGroup = "users"; - - - /** - * Update replication status of a table(s)/db after replication job jobName completes. - * @param jobName Name of the replication job. - * @param statusList List of replication statuses of db/tables replicated by jobName. - */ - public abstract void updateReplicationStatus(String jobName, List<ReplicationStatus> statusList) - throws HiveReplicationException; - - /** - * Get Replication status for a database. - * @param source Replication source uri. - * @param target Replication target uri. - * @param jobName Name of the replication job. - * @param database Name of the target database. - * @return ReplicationStatus - * destination commands for each table - */ - public abstract ReplicationStatus getReplicationStatus(String source, String target, - String jobName, String database) - throws HiveReplicationException; - - /** - * Get Replication status for a table. - * @param source Replication source uri. - * @param target Replication target uri. - * @param jobName Name of the replication job. - * @param database Name of the target database. - * @param table Name of the target table. - * @return ReplicationStatus - * destination commands for each table - */ - public abstract ReplicationStatus getReplicationStatus(String source, String target, - String jobName, String database, - String table) throws HiveReplicationException; - - /** - * Get Replication status of all tables in a database. - * @param source Replication source uri. - * @param target Replication target uri. - * @param jobName Name of the replication job. - * @param database Name of the target database. - * @return Iterator - * destination commands for each table - */ - public abstract Iterator<ReplicationStatus> getTableReplicationStatusesInDb(String source, String target, - String jobName, String database) - throws HiveReplicationException; - - - /** - * Delete a replication job. - * @param jobName Name of the replication job. - * @param database Name of the target database. - * destination commands for each table - */ - public abstract void deleteReplicationStatus(String jobName, String database) throws HiveReplicationException; - - public static String getStoreGroup() { - return storeGroup; - } - - public static void setStoreGroup(String group) { - storeGroup = group; - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/6f5b476c/addons/hivedr/src/main/java/org/apache/falcon/hive/util/DelimiterUtils.java ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/DelimiterUtils.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/DelimiterUtils.java deleted file mode 100644 index 3b3156f..0000000 --- a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/DelimiterUtils.java +++ /dev/null @@ -1,30 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.hive.util; - -/** - * Public delimiters used for event processing. - */ -public final class DelimiterUtils { - public static final String FIELD_DELIM = "\u0001"; - public static final String NEWLINE_DELIM = System.getProperty("line.separator"); - public static final String TAB_DELIM = "\t"; - - private DelimiterUtils() {} -} http://git-wip-us.apache.org/repos/asf/falcon/blob/6f5b476c/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventSourcerUtils.java ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventSourcerUtils.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventSourcerUtils.java deleted file mode 100644 index fb695d0..0000000 --- a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventSourcerUtils.java +++ /dev/null @@ -1,189 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.hive.util; - - -import org.apache.commons.codec.binary.Base64; -import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.falcon.hive.ReplicationEventMetadata; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hive.hcatalog.api.repl.Command; -import org.apache.hive.hcatalog.api.repl.ReplicationUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.io.OutputStream; -import java.util.Map; - -/** - * Utility methods for event sourcer. - */ -public class EventSourcerUtils { - - private static final String METAFILE_EXTENSION = ".meta"; - private static final String SRCFILE_EXTENSION = ".src"; - private static final String TGTFILE_EXTENSION = ".tgt"; - private Path eventsInputDirPath; - private final boolean shouldKeepHistory; - private final FileSystem jobFS; - - private static final Logger LOG = LoggerFactory.getLogger(EventSourcerUtils.class); - - public EventSourcerUtils(final Configuration conf, final boolean shouldKeepHistory, - final String jobName) throws Exception { - this.shouldKeepHistory = shouldKeepHistory; - jobFS = FileSystem.get(conf); - init(jobName); - } - - private void init(final String jobName) throws Exception { - // Create base dir to store events on cluster where job is running - Path dir = new Path(FileUtils.DEFAULT_EVENT_STORE_PATH); - // Validate base path - FileUtils.validatePath(jobFS, new Path(DRStatusStore.BASE_DEFAULT_STORE_PATH)); - - if (!jobFS.exists(dir)) { - if (!jobFS.mkdirs(dir)) { - throw new Exception("Creating directory failed: " + dir); - } - } - - eventsInputDirPath = new Path(FileUtils.DEFAULT_EVENT_STORE_PATH, jobName); - - if (!jobFS.exists(eventsInputDirPath)) { - if (!jobFS.mkdirs(eventsInputDirPath)) { - throw new Exception("Creating directory failed: " + eventsInputDirPath); - } - } - } - - public OutputStream getFileOutputStream(final String path) throws Exception { - return FileSystem.create(jobFS, new Path(path), FileUtils.FS_PERMISSION_700); - } - - public void closeOutputStream(OutputStream out) throws IOException { - if (out != null) { - try { - out.flush(); - } finally { - IOUtils.closeQuietly(out); - } - } - } - - public void persistReplicationEvents(final OutputStream out, - final java.lang.Iterable - <? extends org.apache.hive.hcatalog.api.repl.Command> cmds) - throws Exception { - for (Command cmd : cmds) { - persistReplicationEvents(out, cmd); - } - } - - public void persistReplicationEvents(final OutputStream out, - final Command cmd) throws Exception { - out.write(ReplicationUtils.serializeCommand(cmd).getBytes()); - LOG.debug("HiveDR Serialized Repl Command : {}", cmd); - out.write(DelimiterUtils.NEWLINE_DELIM.getBytes()); - } - - public String persistToMetaFile(final ReplicationEventMetadata data, final String identifier) throws IOException { - if (data != null && data.getEventFileMetadata() != null && !data.getEventFileMetadata().isEmpty()) { - Path metaFilename = new Path(eventsInputDirPath.toString(), identifier + METAFILE_EXTENSION); - OutputStream out = null; - - try { - out = FileSystem.create(jobFS, metaFilename, FileUtils.FS_PERMISSION_700); - - for (Map.Entry<String, String> entry : data.getEventFileMetadata().entrySet()) { - out.write(entry.getKey().getBytes()); - out.write(DelimiterUtils.FIELD_DELIM.getBytes()); - out.write(entry.getValue().getBytes()); - out.write(DelimiterUtils.NEWLINE_DELIM.getBytes()); - } - out.flush(); - } finally { - IOUtils.closeQuietly(out); - } - return jobFS.makeQualified(metaFilename).toString(); - } else { - return null; - } - } - - public static void updateEventMetadata(ReplicationEventMetadata data, final String dbName, final String tableName, - final String srcFilename, final String tgtFilename) { - if (data == null || data.getEventFileMetadata() == null) { - return; - } - StringBuilder key = new StringBuilder(); - - if (StringUtils.isNotEmpty(dbName)) { - key.append(Base64.encodeBase64URLSafeString(dbName.toLowerCase().getBytes())); - } - key.append(DelimiterUtils.FIELD_DELIM); - if (StringUtils.isNotEmpty(tableName)) { - key.append(Base64.encodeBase64URLSafeString(tableName.toLowerCase().getBytes())); - } - - StringBuilder value = new StringBuilder(); - if (StringUtils.isNotEmpty(srcFilename)) { - value.append(srcFilename); - } - value.append(DelimiterUtils.FIELD_DELIM); - - if (StringUtils.isNotEmpty(tgtFilename)) { - value.append(tgtFilename); - } - - data.getEventFileMetadata().put(key.toString(), value.toString()); - } - - public static void updateEventMetadata(ReplicationEventMetadata data, final ReplicationEventMetadata inputData) { - if (data == null || data.getEventFileMetadata() == null || inputData == null - || inputData.getEventFileMetadata() == null || inputData.getEventFileMetadata().isEmpty()) { - return; - } - - data.getEventFileMetadata().putAll(inputData.getEventFileMetadata()); - } - - public Path getSrcFileName(final String identifier) { - return jobFS.makeQualified(new Path(eventsInputDirPath, identifier + SRCFILE_EXTENSION)); - } - - public Path getTargetFileName(final String identifier) { - return jobFS.makeQualified(new Path(eventsInputDirPath, identifier + TGTFILE_EXTENSION)); - } - - public void cleanUpEventInputDir() { - if (!shouldKeepHistory) { - try { - jobFS.delete(eventsInputDirPath, true); - eventsInputDirPath = null; - } catch (IOException e) { - LOG.error("Unable to cleanup: {}", eventsInputDirPath, e); - } - } - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/6f5b476c/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventUtils.java ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventUtils.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventUtils.java deleted file mode 100644 index d075bfb..0000000 --- a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventUtils.java +++ /dev/null @@ -1,393 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.hive.util; - -import org.apache.commons.codec.binary.Base64; -import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.falcon.hive.HiveDRArgs; -import org.apache.falcon.hive.exception.HiveReplicationException; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.mapreduce.JobStatus; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.tools.DistCp; -import org.apache.hadoop.tools.DistCpOptions; -import org.apache.hive.hcatalog.api.repl.Command; -import org.apache.hive.hcatalog.api.repl.ReplicationUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.BufferedReader; -import java.io.File; -import java.io.IOException; -import java.io.InputStreamReader; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.SQLException; -import java.sql.Statement; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Properties; - -/** - * Utility class to handle Hive events for data-mirroring. - */ -public class EventUtils { - private static final String DRIVER_NAME = "org.apache.hive.jdbc.HiveDriver"; - private static final int TIMEOUT_IN_SECS = 300; - private static final String JDBC_PREFIX = "jdbc:"; - private static final int RETRY_ATTEMPTS = 3; - - private Configuration conf = null; - private String sourceHiveServer2Uri = null; - private String sourceDatabase = null; - private String sourceNN = null; - private String sourceNNKerberosPrincipal = null; - private String jobNN = null; - private String jobNNKerberosPrincipal = null; - private String targetHiveServer2Uri = null; - private String targetStagingPath = null; - private String targetNN = null; - private String targetNNKerberosPrincipal = null; - private String fullyQualifiedTargetStagingPath = null; - private List<Path> sourceCleanUpList = null; - private List<Path> targetCleanUpList = null; - private static final Logger LOG = LoggerFactory.getLogger(EventUtils.class); - - private FileSystem sourceFileSystem = null; - private FileSystem jobFileSystem = null; - private FileSystem targetFileSystem = null; - private Connection sourceConnection = null; - private Connection targetConnection = null; - private Statement sourceStatement = null; - private Statement targetStatement = null; - - private Map<String, Long> countersMap = null; - - private List<ReplicationStatus> listReplicationStatus; - - public EventUtils(Configuration conf) { - this.conf = conf; - sourceHiveServer2Uri = conf.get(HiveDRArgs.SOURCE_HS2_URI.getName()); - sourceDatabase = conf.get(HiveDRArgs.SOURCE_DATABASE.getName()); - sourceNN = conf.get(HiveDRArgs.SOURCE_NN.getName()); - sourceNNKerberosPrincipal = conf.get(HiveDRArgs.SOURCE_NN_KERBEROS_PRINCIPAL.getName()); - jobNN = conf.get(HiveDRArgs.JOB_CLUSTER_NN.getName()); - jobNNKerberosPrincipal = conf.get(HiveDRArgs.JOB_CLUSTER_NN_KERBEROS_PRINCIPAL.getName()); - targetHiveServer2Uri = conf.get(HiveDRArgs.TARGET_HS2_URI.getName()); - targetStagingPath = conf.get(HiveDRArgs.TARGET_STAGING_PATH.getName()) - + File.separator + conf.get(HiveDRArgs.JOB_NAME.getName()); - targetNN = conf.get(HiveDRArgs.TARGET_NN.getName()); - targetNNKerberosPrincipal = conf.get(HiveDRArgs.TARGET_NN_KERBEROS_PRINCIPAL.getName()); - sourceCleanUpList = new ArrayList<Path>(); - targetCleanUpList = new ArrayList<Path>(); - countersMap = new HashMap<>(); - } - - public void setupConnection() throws Exception { - Class.forName(DRIVER_NAME); - DriverManager.setLoginTimeout(TIMEOUT_IN_SECS); - String authTokenString = ";auth=delegationToken"; - //To bypass findbugs check, need to store empty password in Properties. - Properties password = new Properties(); - password.put("password", ""); - String user = ""; - - UserGroupInformation currentUser = UserGroupInformation.getCurrentUser(); - if (currentUser != null) { - user = currentUser.getShortUserName(); - } - - if (conf.get(HiveDRArgs.EXECUTION_STAGE.getName()) - .equalsIgnoreCase(HiveDRUtils.ExecutionStage.EXPORT.name())) { - String connString = JDBC_PREFIX + sourceHiveServer2Uri + "/" + sourceDatabase; - if (StringUtils.isNotEmpty(conf.get(HiveDRArgs.SOURCE_HIVE2_KERBEROS_PRINCIPAL.getName()))) { - connString += authTokenString; - } - sourceConnection = DriverManager.getConnection(connString, user, password.getProperty("password")); - sourceStatement = sourceConnection.createStatement(); - } else { - String connString = JDBC_PREFIX + targetHiveServer2Uri + "/" + sourceDatabase; - if (StringUtils.isNotEmpty(conf.get(HiveDRArgs.TARGET_HIVE2_KERBEROS_PRINCIPAL.getName()))) { - connString += authTokenString; - } - targetConnection = DriverManager.getConnection(connString, user, password.getProperty("password")); - targetStatement = targetConnection.createStatement(); - } - } - - public void initializeFS() throws IOException { - LOG.info("Initializing staging directory"); - fullyQualifiedTargetStagingPath = new Path(targetNN, targetStagingPath).toString(); - sourceFileSystem = FileSystem.get(FileUtils.getConfiguration(sourceNN, sourceNNKerberosPrincipal)); - jobFileSystem = FileSystem.get(FileUtils.getConfiguration(jobNN, jobNNKerberosPrincipal)); - targetFileSystem = FileSystem.get(FileUtils.getConfiguration(targetNN, targetNNKerberosPrincipal)); - } - - private String readEvents(Path eventFileName) throws IOException { - StringBuilder eventString = new StringBuilder(); - BufferedReader in = new BufferedReader(new InputStreamReader(jobFileSystem.open(eventFileName))); - try { - String line; - while ((line=in.readLine())!=null) { - eventString.append(line); - eventString.append(DelimiterUtils.NEWLINE_DELIM); - } - } catch (Exception e) { - throw new IOException(e); - } finally { - IOUtils.closeQuietly(in); - } - - return eventString.toString(); - } - - public void processEvents(String event) throws Exception { - listReplicationStatus = new ArrayList<ReplicationStatus>(); - String[] eventSplit = event.split(DelimiterUtils.FIELD_DELIM); - String dbName = new String(Base64.decodeBase64(eventSplit[0]), "UTF-8"); - String tableName = new String(Base64.decodeBase64(eventSplit[1]), "UTF-8"); - String exportEventStr; - String importEventStr; - if (conf.get(HiveDRArgs.EXECUTION_STAGE.getName()) - .equalsIgnoreCase(HiveDRUtils.ExecutionStage.EXPORT.name())) { - exportEventStr = readEvents(new Path(eventSplit[2])); - if (StringUtils.isNotEmpty(exportEventStr)) { - LOG.info("Process the export statements for db {} table {}", dbName, tableName); - processCommands(exportEventStr, dbName, tableName, sourceStatement, sourceCleanUpList, false); - if (!sourceCleanUpList.isEmpty()) { - invokeCopy(sourceCleanUpList); - } - } - } else if (conf.get(HiveDRArgs.EXECUTION_STAGE.getName()) - .equalsIgnoreCase(HiveDRUtils.ExecutionStage.IMPORT.name())) { - importEventStr = readEvents(new Path(eventSplit[3])); - if (StringUtils.isNotEmpty(importEventStr)) { - LOG.info("Process the import statements for db {} table {}", dbName, tableName); - processCommands(importEventStr, dbName, tableName, targetStatement, targetCleanUpList, true); - } - } - } - - public List<ReplicationStatus> getListReplicationStatus() { - return listReplicationStatus; - } - - private void processCommands(String eventStr, String dbName, String tableName, Statement sqlStmt, - List<Path> cleanUpList, boolean isImportStatements) - throws SQLException, HiveReplicationException, IOException { - String[] commandList = eventStr.split(DelimiterUtils.NEWLINE_DELIM); - List<Command> deserializeCommand = new ArrayList<Command>(); - for (String command : commandList) { - Command cmd = ReplicationUtils.deserializeCommand(command); - deserializeCommand.add(cmd); - List<String> cleanupLocations = cmd.cleanupLocationsAfterEvent(); - cleanUpList.addAll(getCleanUpPaths(cleanupLocations)); - } - for (Command cmd : deserializeCommand) { - try { - LOG.debug("Executing command : {} : {} ", cmd.getEventId(), cmd.toString()); - executeCommand(cmd, dbName, tableName, sqlStmt, isImportStatements, 0); - } catch (Exception e) { - // clean up locations before failing. - cleanupEventLocations(sourceCleanUpList, sourceFileSystem); - cleanupEventLocations(targetCleanUpList, targetFileSystem); - throw new HiveReplicationException("Could not process replication command for " - + " DB Name:" + dbName + ", Table Name:" + tableName, e); - } - } - } - - private void executeCommand(Command cmd, String dbName, String tableName, - Statement sqlStmt, boolean isImportStatements, int attempt) - throws HiveReplicationException, SQLException, IOException { - for (final String stmt : cmd.get()) { - executeSqlStatement(cmd, dbName, tableName, sqlStmt, stmt, isImportStatements, attempt); - } - if (isImportStatements) { - addReplicationStatus(ReplicationStatus.Status.SUCCESS, dbName, tableName, cmd.getEventId()); - } - } - - private void executeSqlStatement(Command cmd, String dbName, String tableName, - Statement sqlStmt, String stmt, boolean isImportStatements, int attempt) - throws HiveReplicationException, SQLException, IOException { - try { - sqlStmt.execute(stmt); - } catch (SQLException sqeOuter) { - // Retry if command is retriable. - if (attempt < RETRY_ATTEMPTS && cmd.isRetriable()) { - if (isImportStatements) { - try { - cleanupEventLocations(getCleanUpPaths(cmd.cleanupLocationsPerRetry()), targetFileSystem); - } catch (IOException ioe) { - // Clean up failed before retry on target. Update failure status and return - addReplicationStatus(ReplicationStatus.Status.FAILURE, dbName, - tableName, cmd.getEventId()); - throw ioe; - } - } else { - cleanupEventLocations(getCleanUpPaths(cmd.cleanupLocationsPerRetry()), sourceFileSystem); - } - executeCommand(cmd, dbName, tableName, sqlStmt, isImportStatements, ++attempt); - return; // Retry succeeded, return without throwing an exception. - } - // If we reached here, retries have failed. - LOG.error("SQL Exception: {}", sqeOuter); - undoCommand(cmd, dbName, tableName, sqlStmt, isImportStatements); - if (isImportStatements) { - addReplicationStatus(ReplicationStatus.Status.FAILURE, dbName, tableName, cmd.getEventId()); - } - throw sqeOuter; - } - } - - private static List<Path> getCleanUpPaths(List<String> cleanupLocations) { - List<Path> cleanupLocationPaths = new ArrayList<Path>(); - for (String cleanupLocation : cleanupLocations) { - cleanupLocationPaths.add(new Path(cleanupLocation)); - } - return cleanupLocationPaths; - } - - private void undoCommand(Command cmd, String dbName, - String tableName, Statement sqlStmt, boolean isImportStatements) - throws SQLException, HiveReplicationException { - if (cmd.isUndoable()) { - try { - List<String> undoCommands = cmd.getUndo(); - LOG.debug("Undo command: {}", StringUtils.join(undoCommands.toArray())); - if (undoCommands.size() != 0) { - for (final String undoStmt : undoCommands) { - sqlStmt.execute(undoStmt); - } - } - } catch (SQLException sqeInner) { - if (isImportStatements) { - addReplicationStatus(ReplicationStatus.Status.FAILURE, dbName, - tableName, cmd.getEventId()); - } - LOG.error("SQL Exception: {}", sqeInner); - throw sqeInner; - } - } - } - - private void addReplicationStatus(ReplicationStatus.Status status, String dbName, String tableName, long eventId) - throws HiveReplicationException { - try { - String drJobName = conf.get(HiveDRArgs.JOB_NAME.getName()); - ReplicationStatus rs = new ReplicationStatus(conf.get(HiveDRArgs.SOURCE_CLUSTER.getName()), - conf.get(HiveDRArgs.TARGET_CLUSTER.getName()), drJobName, dbName, tableName, status, eventId); - listReplicationStatus.add(rs); - } catch (HiveReplicationException hre) { - throw new HiveReplicationException("Could not update replication status store for " - + " EventId:" + eventId - + " DB Name:" + dbName - + " Table Name:" + tableName - + hre.toString()); - } - } - - public void invokeCopy(List<Path> srcStagingPaths) throws Exception { - DistCpOptions options = getDistCpOptions(srcStagingPaths); - DistCp distCp = new DistCp(conf, options); - LOG.info("Started DistCp with source Path: {} \ttarget path: {}", StringUtils.join(srcStagingPaths.toArray()), - fullyQualifiedTargetStagingPath); - Job distcpJob = distCp.execute(); - LOG.info("Distp Hadoop job: {}", distcpJob.getJobID().toString()); - LOG.info("Completed DistCp"); - if (distcpJob.getStatus().getState() == JobStatus.State.SUCCEEDED) { - countersMap = HiveDRUtils.fetchReplicationCounters(conf, distcpJob); - } - } - - public DistCpOptions getDistCpOptions(List<Path> srcStagingPaths) { - /* - * Add the fully qualified sourceNameNode to srcStagingPath uris. This will - * ensure DistCp will succeed when the job is run on target cluster. - */ - List<Path> fullyQualifiedSrcStagingPaths = new ArrayList<Path>(); - for (Path srcPath : srcStagingPaths) { - fullyQualifiedSrcStagingPaths.add(new Path(sourceNN, srcPath.toString())); - } - fullyQualifiedSrcStagingPaths.toArray(new Path[fullyQualifiedSrcStagingPaths.size()]); - - DistCpOptions distcpOptions = new DistCpOptions(fullyQualifiedSrcStagingPaths, - new Path(fullyQualifiedTargetStagingPath)); - /* setSyncFolder to false to retain dir structure as in source at the target. If set to true all files will be - copied to the same staging sir at target resulting in DuplicateFileException in DistCp. - */ - - distcpOptions.setSyncFolder(false); - distcpOptions.setBlocking(true); - distcpOptions.setMaxMaps(Integer.valueOf(conf.get(HiveDRArgs.DISTCP_MAX_MAPS.getName()))); - distcpOptions.setMapBandwidth(Integer.valueOf(conf.get(HiveDRArgs.DISTCP_MAP_BANDWIDTH.getName()))); - return distcpOptions; - } - - public Long getCounterValue(String counterKey) { - return countersMap.get(counterKey); - } - - public boolean isCountersMapEmtpy() { - return countersMap.size() == 0 ? true : false; - } - - public void cleanEventsDirectory() throws IOException { - LOG.info("Cleaning staging directory"); - cleanupEventLocations(sourceCleanUpList, sourceFileSystem); - cleanupEventLocations(targetCleanUpList, targetFileSystem); - } - - private void cleanupEventLocations(List<Path> cleanupList, FileSystem fileSystem) - throws IOException { - for (Path cleanUpPath : cleanupList) { - try { - fileSystem.delete(cleanUpPath, true); - } catch (IOException ioe) { - LOG.error("Cleaning up of staging directory {} failed {}", cleanUpPath, ioe.toString()); - throw ioe; - } - } - - } - - public void closeConnection() throws SQLException { - if (sourceStatement != null) { - sourceStatement.close(); - } - - if (targetStatement != null) { - targetStatement.close(); - } - - if (sourceConnection != null) { - sourceConnection.close(); - } - if (targetConnection != null) { - targetConnection.close(); - } - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/6f5b476c/addons/hivedr/src/main/java/org/apache/falcon/hive/util/FileUtils.java ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/FileUtils.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/FileUtils.java deleted file mode 100644 index 6bd6319..0000000 --- a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/FileUtils.java +++ /dev/null @@ -1,68 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.hive.util; - -import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.permission.FsAction; -import org.apache.hadoop.fs.permission.FsPermission; - -import java.io.File; -import java.io.IOException; - -/** - * Utility class to validate HDFS files. - */ -public final class FileUtils { - - public static final String DEFAULT_EVENT_STORE_PATH = DRStatusStore.BASE_DEFAULT_STORE_PATH - + File.separator + "Events"; - public static final FsPermission FS_PERMISSION_700 = new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE); - - - private FileUtils() {} - - public static Configuration getConfiguration(final String writeEP, final String nnKerberosPrincipal) { - Configuration conf = new Configuration(); - conf.set("fs.defaultFS", writeEP); - if (StringUtils.isNotEmpty(nnKerberosPrincipal)) { - conf.set("dfs.namenode.kerberos.principal", nnKerberosPrincipal); - } - return conf; - } - - public static void validatePath(final FileSystem fileSystem, final Path basePath) throws IOException { - if (!fileSystem.exists(basePath)) { - throw new IOException("Please create base dir " + fileSystem.getUri() + basePath - + ". Please set group to " + DRStatusStore.getStoreGroup() - + " and permissions to " + DRStatusStore.DEFAULT_STORE_PERMISSION.toString()); - } - - if (!fileSystem.getFileStatus(basePath).getPermission().equals(DRStatusStore.DEFAULT_STORE_PERMISSION) - || !fileSystem.getFileStatus(basePath).getGroup().equalsIgnoreCase(DRStatusStore.getStoreGroup())) { - throw new IOException("Base dir " + fileSystem.getUri() + basePath - + " does not have correct ownership/permissions." - + " Please set group to " + DRStatusStore.getStoreGroup() - + " and permissions to " + DRStatusStore.DEFAULT_STORE_PERMISSION.toString()); - } - - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/6f5b476c/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveDRStatusStore.java ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveDRStatusStore.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveDRStatusStore.java deleted file mode 100644 index 900afe8..0000000 --- a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveDRStatusStore.java +++ /dev/null @@ -1,315 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.hive.util; - -import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.falcon.hive.exception.HiveReplicationException; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocatedFileStatus; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.RemoteIterator; -import org.apache.hadoop.fs.permission.FsAction; -import org.apache.hadoop.fs.permission.FsPermission; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; - -/** - * DRStatusStore implementation for hive. - */ -public class HiveDRStatusStore extends DRStatusStore { - - private static final Logger LOG = LoggerFactory.getLogger(DRStatusStore.class); - private FileSystem fileSystem; - - private static final String DEFAULT_STORE_PATH = BASE_DEFAULT_STORE_PATH + "hiveReplicationStatusStore/"; - private static final FsPermission DEFAULT_STATUS_DIR_PERMISSION = - new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.NONE); - - private static final String LATEST_FILE = "latest.json"; - private static final int FILE_ROTATION_LIMIT = 10; - private static final int FILE_ROTATION_TIME = 86400000; // 1 day - - - public HiveDRStatusStore(FileSystem targetFileSystem) throws IOException { - init(targetFileSystem); - } - - public HiveDRStatusStore(FileSystem targetFileSystem, String group) throws IOException { - HiveDRStatusStore.setStoreGroup(group); - init(targetFileSystem); - } - - private void init(FileSystem targetFileSystem) throws IOException { - this.fileSystem = targetFileSystem; - Path basePath = new Path(BASE_DEFAULT_STORE_PATH); - FileUtils.validatePath(fileSystem, basePath); - - Path storePath = new Path(DEFAULT_STORE_PATH); - if (!fileSystem.exists(storePath)) { - if (!FileSystem.mkdirs(fileSystem, storePath, DEFAULT_STORE_PERMISSION)) { - throw new IOException("mkdir failed for " + DEFAULT_STORE_PATH); - } - } else { - if (!fileSystem.getFileStatus(storePath).getPermission().equals(DEFAULT_STORE_PERMISSION)) { - throw new IOException("Base dir " + DEFAULT_STORE_PATH + "does not have correct permissions. " - + "Please set to 777"); - } - } - } - - /** - get all DB updated by the job. get all current table statuses for the DB merge the latest repl - status with prev table repl statuses. If all statuses are success, store the status as success - with largest eventId for the DB else store status as failure for the DB and lowest eventId. - */ - @Override - public void updateReplicationStatus(String jobName, List<ReplicationStatus> statusList) - throws HiveReplicationException { - Map<String, DBReplicationStatus> dbStatusMap = new HashMap<String, DBReplicationStatus>(); - for (ReplicationStatus status : statusList) { - if (!status.getJobName().equals(jobName)) { - String error = "JobName for status does not match current job \"" + jobName - + "\". Status is " + status.toJsonString(); - LOG.error(error); - throw new HiveReplicationException(error); - } - - // init dbStatusMap and tableStatusMap from existing statuses. - if (!dbStatusMap.containsKey(status.getDatabase())) { - DBReplicationStatus dbStatus = getDbReplicationStatus(status.getSourceUri(), status.getTargetUri(), - status.getJobName(), status.getDatabase()); - dbStatusMap.put(status.getDatabase(), dbStatus); - } - - // update existing statuses with new status for db/tables - if (StringUtils.isEmpty(status.getTable())) { // db level replication status. - dbStatusMap.get(status.getDatabase()).updateDbStatus(status); - } else { // table level replication status - dbStatusMap.get(status.getDatabase()).updateTableStatus(status); - } - } - // write to disk - for (Map.Entry<String, DBReplicationStatus> entry : dbStatusMap.entrySet()) { - writeStatusFile(entry.getValue()); - } - } - - @Override - public ReplicationStatus getReplicationStatus(String source, String target, String jobName, String database) - throws HiveReplicationException { - return getReplicationStatus(source, target, jobName, database, null); - } - - - public ReplicationStatus getReplicationStatus(String source, String target, - String jobName, String database, - String table) throws HiveReplicationException { - if (StringUtils.isEmpty(table)) { - return getDbReplicationStatus(source, target, jobName, database).getDatabaseStatus(); - } else { - return getDbReplicationStatus(source, target, jobName, database).getTableStatus(table); - } - } - - @Override - public Iterator<ReplicationStatus> getTableReplicationStatusesInDb(String source, String target, - String jobName, String database) - throws HiveReplicationException { - DBReplicationStatus dbReplicationStatus = getDbReplicationStatus(source, target, jobName, database); - return dbReplicationStatus.getTableStatusIterator(); - } - - @Override - public void deleteReplicationStatus(String jobName, String database) throws HiveReplicationException { - Path deletePath = getStatusDirPath(database, jobName); - try { - if (fileSystem.exists(deletePath)) { - fileSystem.delete(deletePath, true); - } - } catch (IOException e) { - throw new HiveReplicationException("Failed to delete status for Job " - + jobName + " and DB "+ database, e); - } - - } - - private DBReplicationStatus getDbReplicationStatus(String source, String target, String jobName, - String database) throws HiveReplicationException{ - DBReplicationStatus dbReplicationStatus = null; - Path statusDirPath = getStatusDirPath(database, jobName); - // check if database name or jobName can contain chars not allowed by hdfs dir/file naming. - // if yes, use md5 of the same for dir names. prefer to use actual db names for readability. - - try { - if (fileSystem.exists(statusDirPath)) { - dbReplicationStatus = readStatusFile(statusDirPath); - } - if (null == dbReplicationStatus) { - // Init replication state for this database - ReplicationStatus initDbStatus = new ReplicationStatus(source, target, jobName, - database, null, ReplicationStatus.Status.INIT, -1); - dbReplicationStatus = new DBReplicationStatus(initDbStatus); - if (!FileSystem.mkdirs(fileSystem, statusDirPath, DEFAULT_STATUS_DIR_PERMISSION)) { - String error = "mkdir failed for " + statusDirPath.toString(); - LOG.error(error); - throw new HiveReplicationException(error); - } - writeStatusFile(dbReplicationStatus); - } - return dbReplicationStatus; - } catch (IOException e) { - String error = "Failed to get ReplicationStatus for job " + jobName; - LOG.error(error); - throw new HiveReplicationException(error); - } - } - - private Path getStatusDirPath(DBReplicationStatus dbReplicationStatus) { - ReplicationStatus status = dbReplicationStatus.getDatabaseStatus(); - return getStatusDirPath(status.getDatabase(), status.getJobName()); - } - - public Path getStatusDirPath(String database, String jobName) { - return new Path(DEFAULT_STORE_PATH + "/" + database.toLowerCase() + "/" + jobName); - } - - private void writeStatusFile(DBReplicationStatus dbReplicationStatus) throws HiveReplicationException { - dbReplicationStatus.updateDbStatusFromTableStatuses(); - String statusDir = getStatusDirPath(dbReplicationStatus).toString(); - try { - Path latestFile = new Path(statusDir + "/" + LATEST_FILE); - if (fileSystem.exists(latestFile)) { - Path renamedFile = new Path(statusDir + "/" - + String.valueOf(fileSystem.getFileStatus(latestFile).getModificationTime()) + ".json"); - fileSystem.rename(latestFile, renamedFile); - } - - FSDataOutputStream stream = FileSystem.create(fileSystem, latestFile, DEFAULT_STATUS_DIR_PERMISSION); - stream.write(dbReplicationStatus.toJsonString().getBytes()); - stream.close(); - - } catch (IOException e) { - String error = "Failed to write latest Replication status into dir " + statusDir; - LOG.error(error); - throw new HiveReplicationException(error); - } - - rotateStatusFiles(new Path(statusDir), FILE_ROTATION_LIMIT, FILE_ROTATION_TIME); - } - - public void rotateStatusFiles(Path statusDir, int numFiles, int maxFileAge) throws HiveReplicationException { - - List<String> fileList = new ArrayList<String>(); - long now = System.currentTimeMillis(); - try { - RemoteIterator<LocatedFileStatus> fileIterator = fileSystem.listFiles(statusDir, false); - while (fileIterator.hasNext()) { - fileList.add(fileIterator.next().getPath().toString()); - } - if (fileList.size() > (numFiles+1)) { - // delete some files, as long as they are older than the time. - Collections.sort(fileList); - for (String file : fileList.subList(0, (fileList.size() - numFiles + 1))) { - long modTime = fileSystem.getFileStatus(new Path(file)).getModificationTime(); - if ((now - modTime) > maxFileAge) { - Path deleteFilePath = new Path(file); - if (fileSystem.exists(deleteFilePath)) { - fileSystem.delete(deleteFilePath, false); - } - } - } - } - } catch (IOException e) { - String error = "Failed to rotate status files in dir " + statusDir.toString(); - LOG.error(error); - throw new HiveReplicationException(error); - } - } - - private DBReplicationStatus readStatusFile(Path statusDirPath) throws HiveReplicationException { - try { - Path statusFile = new Path(statusDirPath.toString() + "/" + LATEST_FILE); - if ((!fileSystem.exists(statusDirPath)) || (!fileSystem.exists(statusFile))) { - return null; - } else { - return new DBReplicationStatus(IOUtils.toString(fileSystem.open(statusFile))); - } - } catch (IOException e) { - String error = "Failed to read latest Replication status from dir " + statusDirPath.toString(); - LOG.error(error); - throw new HiveReplicationException(error); - } - } - - public void checkForReplicationConflict(String newSource, String jobName, - String database, String table) throws HiveReplicationException { - try { - Path globPath = new Path(DEFAULT_STORE_PATH + "/" + database.toLowerCase() + "/*/latest.json"); - FileStatus[] files = fileSystem.globStatus(globPath); - for(FileStatus file : files) { - DBReplicationStatus dbFileStatus = new DBReplicationStatus(IOUtils.toString( - fileSystem.open(file.getPath()))); - ReplicationStatus existingJob = dbFileStatus.getDatabaseStatus(); - - if (!(newSource.equals(existingJob.getSourceUri()))) { - throw new HiveReplicationException("Two different sources are attempting to replicate to same db " - + database + ". New Source = " + newSource - + ", Existing Source = " + existingJob.getSourceUri()); - } // two different sources replicating to same DB. Conflict - if (jobName.equals(existingJob.getJobName())) { - continue; - } // same job, no conflict. - - if (StringUtils.isEmpty(table)) { - // When it is DB level replication, two different jobs cannot replicate to same DB - throw new HiveReplicationException("Two different jobs are attempting to replicate to same db " - + database.toLowerCase() + ". New Job = " + jobName - + ", Existing Job = " + existingJob.getJobName()); - } - - /* - At this point, it is different table level jobs replicating from same newSource to same target. This is - allowed as long as the target tables are different. For example, job1 can replicate db1.table1 and - job2 can replicate db1.table2. Both jobs cannot replicate to same table. - */ - for(Map.Entry<String, ReplicationStatus> entry : dbFileStatus.getTableStatuses().entrySet()) { - if (table.equals(entry.getKey())) { - throw new HiveReplicationException("Two different jobs are trying to replicate to same table " - + entry.getKey() + ". New job = " + jobName - + ", Existing job = " + existingJob.getJobName()); - } - } - } - } catch (IOException e) { - throw new HiveReplicationException("Failed to read status files for DB " - + database, e); - } - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/6f5b476c/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveDRUtils.java ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveDRUtils.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveDRUtils.java deleted file mode 100644 index dff0803..0000000 --- a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveDRUtils.java +++ /dev/null @@ -1,99 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.hive.util; - -import org.apache.falcon.job.JobCounters; -import org.apache.falcon.job.JobCountersHandler; -import org.apache.falcon.job.JobType; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapreduce.Job; -import org.apache.hadoop.util.Shell; - -import java.io.File; -import java.io.IOException; -import java.util.List; -import java.util.Map; - -/** - * Hive replication utility class. - */ -public final class HiveDRUtils { - /** - * Enum for Hive replication type. - */ - public enum ReplicationType { - TABLE, - DB - } - - /** - * Enum for hive-dr action type. - */ - public enum ExecutionStage { - IMPORT, - EXPORT, - LASTEVENTS - } - - private static final String ALL_TABLES = "*"; - - public static final String SEPARATOR = File.separator; - - private HiveDRUtils() {} - - public static ReplicationType getReplicationType(List<String> sourceTables) { - return (sourceTables.size() == 1 && sourceTables.get(0).equals(ALL_TABLES)) ? ReplicationType.DB - : ReplicationType.TABLE; - } - - public static Configuration getDefaultConf() throws IOException { - Configuration conf = new Configuration(); - conf.addResource(new Path("file:///", System.getProperty("oozie.action.conf.xml"))); - String delegationToken = getFilePathFromEnv("HADOOP_TOKEN_FILE_LOCATION"); - if (delegationToken != null) { - conf.set("mapreduce.job.credentials.binary", delegationToken); - conf.set("tez.credentials.path", delegationToken); - } - return conf; - } - - public static String getFilePathFromEnv(String env) { - String path = System.getenv(env); - if (path != null && Shell.WINDOWS) { - // In Windows, file paths are enclosed in \" so remove them here - // to avoid path errors - if (path.charAt(0) == '"') { - path = path.substring(1); - } - if (path.charAt(path.length() - 1) == '"') { - path = path.substring(0, path.length() - 1); - } - } - return path; - } - - public static Map<String, Long> fetchReplicationCounters(Configuration conf, - Job job) throws IOException, InterruptedException { - JobCounters hiveReplicationCounters = JobCountersHandler.getCountersType( - JobType.HIVEREPLICATION.name()); - hiveReplicationCounters.obtainJobCounters(conf, job, true); - return hiveReplicationCounters.getCountersMap(); - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/6f5b476c/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveMetastoreUtils.java ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveMetastoreUtils.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveMetastoreUtils.java deleted file mode 100644 index ea19f09..0000000 --- a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveMetastoreUtils.java +++ /dev/null @@ -1,92 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.hive.util; - -import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.security.Credentials; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hive.hcatalog.api.HCatClient; -import org.apache.hive.hcatalog.api.repl.exim.EximReplicationTaskFactory; -import org.apache.hive.hcatalog.cli.SemanticAnalysis.HCatSemanticAnalyzer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.IOException; - -/** - * Create hive metastore client for user. - */ -public final class HiveMetastoreUtils { - - private static final Logger LOG = LoggerFactory.getLogger(HiveMetastoreUtils.class); - - private HiveMetastoreUtils() {} - - public static HCatClient initializeHiveMetaStoreClient(String metastoreUri, String metastorePrincipal, - String hive2Principal) throws Exception { - try { - HiveConf hcatConf = createHiveConf(HiveDRUtils.getDefaultConf(), - metastoreUri, metastorePrincipal, hive2Principal); - HCatClient client = HCatClient.create(hcatConf); - return client; - } catch (IOException e) { - throw new Exception("Exception creating HCatClient: " + e.getMessage(), e); - } - } - - private static HiveConf createHiveConf(Configuration conf, String metastoreUrl, String metastorePrincipal, - String hive2Principal) throws IOException { - JobConf jobConf = new JobConf(conf); - String delegationToken = HiveDRUtils.getFilePathFromEnv("HADOOP_TOKEN_FILE_LOCATION"); - if (delegationToken != null) { - Credentials credentials = Credentials.readTokenStorageFile(new File(delegationToken), conf); - jobConf.setCredentials(credentials); - UserGroupInformation.getCurrentUser().addCredentials(credentials); - } - - HiveConf hcatConf = new HiveConf(jobConf, HiveConf.class); - - hcatConf.setVar(HiveConf.ConfVars.METASTOREURIS, metastoreUrl); - hcatConf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES, 3); - hcatConf.set(HiveConf.ConfVars.SEMANTIC_ANALYZER_HOOK.varname, - HCatSemanticAnalyzer.class.getName()); - hcatConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); - - hcatConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); - hcatConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); - hcatConf.set(HiveConf.ConfVars.HIVE_REPL_TASK_FACTORY.varname, EximReplicationTaskFactory.class.getName()); - if (StringUtils.isNotEmpty(metastorePrincipal)) { - hcatConf.set(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname, metastorePrincipal); - hcatConf.set(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL.varname, "true"); - hcatConf.set(HiveConf.ConfVars.METASTORE_EXECUTE_SET_UGI.varname, "true"); - hcatConf.set("hadoop.rpc.protection", "authentication"); - } - if (StringUtils.isNotEmpty(hive2Principal)) { - hcatConf.set(HiveConf.ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL.varname, hive2Principal); - hcatConf.set(HiveConf.ConfVars.HIVE_SERVER2_AUTHENTICATION.varname, "kerberos"); - } - - return hcatConf; - } - -} http://git-wip-us.apache.org/repos/asf/falcon/blob/6f5b476c/addons/hivedr/src/main/java/org/apache/falcon/hive/util/ReplicationStatus.java ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/ReplicationStatus.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/ReplicationStatus.java deleted file mode 100644 index bb33772..0000000 --- a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/ReplicationStatus.java +++ /dev/null @@ -1,221 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.hive.util; - -import org.apache.commons.lang3.StringUtils; -import org.apache.falcon.hive.exception.HiveReplicationException; -import org.codehaus.jettison.json.JSONException; -import org.codehaus.jettison.json.JSONObject; - -/** - * Object to store replication status of a DB or a table. - */ -public class ReplicationStatus { - - public static final int INDENT_FACTOR = 4; - private static final String SOURCE = "sourceUri"; - private static final String TARGET = "targetUri"; - private static final String JOB_NAME = "jobName"; - private static final String DATABASE = "database"; - private static final String TABLE = "table"; - private static final String EVENT_ID = "eventId"; - private static final String STATUS_KEY = "status"; - private static final String STATUS_LOG = "statusLog"; - - /** - * Replication Status enum. - */ - public static enum Status { - INIT, - SUCCESS, - FAILURE - } - - private String sourceUri; - private String targetUri; - private String jobName; - private String database; - private String table; - private Status status = Status.SUCCESS; - private long eventId = -1; - private String log; - - //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck - public ReplicationStatus(String sourceUri, String targetUri, String jobName, - String database, String table, - ReplicationStatus.Status status, long eventId) throws HiveReplicationException { - init(sourceUri, targetUri, jobName, database, table, status, eventId, null); - } - - private void init(String source, String target, String job, - String dbName, String tableName, ReplicationStatus.Status replStatus, - long eventNum, String logStr) throws HiveReplicationException { - setSourceUri(source); - setTargetUri(target); - setJobName(job); - setDatabase(dbName); - setTable(tableName); - setStatus(replStatus); - setEventId(eventNum); - setLog(logStr); - } - //RESUME CHECKSTYLE CHECK ParameterNumberCheck - - public ReplicationStatus(String jsonString) throws HiveReplicationException { - try { - JSONObject object = new JSONObject(jsonString); - Status objectStatus; - try { - objectStatus = ReplicationStatus.Status.valueOf(object.getString(STATUS_KEY).toUpperCase()); - } catch (IllegalArgumentException e1) { - throw new HiveReplicationException("Unable to deserialize jsonString to ReplicationStatus." - + " Invalid status " + object.getString(STATUS_KEY), e1); - } - - init(object.getString(SOURCE), object.getString(TARGET), object.getString(JOB_NAME), - object.getString(DATABASE), object.has(TABLE) ? object.getString(TABLE) : null, - objectStatus, object.has(EVENT_ID) ? object.getLong(EVENT_ID) : -1, - object.has(STATUS_LOG) ? object.getString(STATUS_LOG) : null); - } catch (JSONException e) { - throw new HiveReplicationException("Unable to deserialize jsonString to ReplicationStatus ", e); - } - - } - - public String toJsonString() throws HiveReplicationException { - try { - return toJsonObject().toString(INDENT_FACTOR); - } catch (JSONException e) { - throw new HiveReplicationException("Unable to serialize ReplicationStatus ", e); - } - } - - public JSONObject toJsonObject() throws HiveReplicationException { - JSONObject jsonObject = new JSONObject(); - try { - jsonObject.put(SOURCE, this.sourceUri); - jsonObject.put(TARGET, this.targetUri); - jsonObject.put(JOB_NAME, this.jobName); - jsonObject.put(DATABASE, this.database); - if (StringUtils.isNotEmpty(this.table)) { - jsonObject.put(TABLE, this.table); - } - jsonObject.put(STATUS_KEY, this.status.name()); - if (this.eventId > -1) { - jsonObject.put(EVENT_ID, this.eventId); - } else { - jsonObject.put(EVENT_ID, -1); - } - if (StringUtils.isNotEmpty(this.log)) { - jsonObject.put(STATUS_LOG, this.log); - } - return jsonObject; - } catch (JSONException e) { - throw new HiveReplicationException("Unable to serialize ReplicationStatus ", e); - } - } - - public String getSourceUri() { - return this.sourceUri; - } - - public void setSourceUri(String source) throws HiveReplicationException { - validateString(SOURCE, source); - this.sourceUri = source; - } - - public String getTargetUri() { - return this.targetUri; - } - - public void setTargetUri(String target) throws HiveReplicationException { - validateString(TARGET, target); - this.targetUri = target; - } - - public String getJobName() { - return this.jobName; - } - - public void setJobName(String jobName) throws HiveReplicationException { - validateString(JOB_NAME, jobName); - this.jobName = jobName; - } - - public String getDatabase() { - return this.database; - } - - public void setDatabase(String database) throws HiveReplicationException { - validateString(DATABASE, database); - this.database = database.toLowerCase(); - } - - public String getTable() { - return this.table; - } - - public void setTable(String table) { - this.table = (table == null) ? null : table.toLowerCase(); - } - - public Status getStatus() { - return this.status; - } - - public void setStatus(Status status) throws HiveReplicationException { - if (status != null) { - this.status = status; - } else { - throw new HiveReplicationException("Failed to set ReplicationStatus. Input \"" - + STATUS_KEY + "\" cannot be empty"); - } - } - - public long getEventId() { - return this.eventId; - } - - public void setEventId(long eventId) throws HiveReplicationException { - if (eventId > -1) { - this.eventId = eventId; - } - } - - public String getLog() { - return this.log; - } - - public void setLog(String log) { - this.log = log; - } - - private void validateString(String inputName, String input) throws HiveReplicationException { - if (StringUtils.isEmpty(input)) { - throw new HiveReplicationException("Failed to set ReplicationStatus. Input \"" - + inputName + "\" cannot be empty"); - } - } - - public String toString() { - return sourceUri + "\t" + targetUri + "\t" + jobName + "\t" - + database + "\t"+ table + "\t" + status + "\t"+ eventId; - } - -} http://git-wip-us.apache.org/repos/asf/falcon/blob/6f5b476c/addons/hivedr/src/main/resources/log4j.xml ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/main/resources/log4j.xml b/addons/hivedr/src/main/resources/log4j.xml deleted file mode 100644 index f83a9a9..0000000 --- a/addons/hivedr/src/main/resources/log4j.xml +++ /dev/null @@ -1,54 +0,0 @@ -<?xml version="1.0" encoding="UTF-8" ?> -<!-- - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - --> - -<!-- - This is used for falcon packaging only. - --> - -<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd"> - -<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/"> - <appender name="console" class="org.apache.log4j.ConsoleAppender"> - <param name="Target" value="System.out"/> - <layout class="org.apache.log4j.PatternLayout"> - <param name="ConversionPattern" value="%d %-5p - [%t:%x] ~ %m (%c{1}:%L)%n"/> - </layout> - </appender> - - <logger name="org.apache.falcon" additivity="false"> - <level value="debug"/> - <appender-ref ref="console"/> - </logger> - - <logger name="org.apache.hadoop" additivity="false"> - <level value="info"/> - <appender-ref ref="console"/> - </logger> - - <logger name="org.apache.hadoop.hive" additivity="false"> - <level value="info"/> - <appender-ref ref="console"/> - </logger> - - <root> - <priority value="info"/> - <appender-ref ref="console"/> - </root> - -</log4j:configuration>
