http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyCommitter.java ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyCommitter.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyCommitter.java new file mode 100644 index 0000000..98449f0 --- /dev/null +++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyCommitter.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.hive.mapreduce; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * Copy committer class. + */ +public class CopyCommitter extends FileOutputCommitter { + + private static final Logger LOG = LoggerFactory.getLogger(CopyCommitter.class); + + /** + * Create a file output committer. + * + * @param outputPath the job's output path, or null if you want the output + * committer to act as a noop. + * @param context the task's context + * @throws java.io.IOException + */ + public CopyCommitter(Path outputPath, + TaskAttemptContext context) throws IOException { + super(outputPath, context); + } + + @Override + public void commitJob(JobContext jobContext) throws IOException { + Configuration conf = jobContext.getConfiguration(); + + try { + super.commitJob(jobContext); + } finally { + cleanup(conf); + } + } + + private void cleanup(Configuration conf) { + // clean up staging and other data + } +}
http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyMapper.java ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyMapper.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyMapper.java new file mode 100644 index 0000000..5eb8acb --- /dev/null +++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyMapper.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.hive.mapreduce; + +import org.apache.falcon.hive.HiveDRArgs; +import org.apache.falcon.hive.util.EventUtils; +import org.apache.falcon.hive.util.HiveDRUtils; +import org.apache.falcon.hive.util.ReplicationStatus; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Mapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.sql.SQLException; +import java.util.List; + +/** + * Map class for Hive DR. + */ +public class CopyMapper extends Mapper<LongWritable, Text, Text, Text> { + + private static final Logger LOG = LoggerFactory.getLogger(CopyMapper.class); + private EventUtils eventUtils; + + @Override + protected void setup(Context context) throws IOException, InterruptedException { + eventUtils = new EventUtils(context.getConfiguration()); + eventUtils.initializeFS(); + try { + eventUtils.setupConnection(); + } catch (Exception e) { + throw new IOException(e); + } + } + + @Override + protected void map(LongWritable key, Text value, + Context context) throws IOException, InterruptedException { + LOG.debug("Processing Event value: {}", value.toString()); + + try { + eventUtils.processEvents(value.toString()); + } catch (Exception e) { + LOG.error("Exception in processing events:", e); + throw new IOException(e); + } finally { + cleanup(context); + } + List<ReplicationStatus> replicationStatusList = eventUtils.getListReplicationStatus(); + if (replicationStatusList != null && !replicationStatusList.isEmpty()) { + for (ReplicationStatus rs : replicationStatusList) { + context.write(new Text(rs.getJobName()), new Text(rs.toString())); + } + } + } + + protected void cleanup(Context context) throws IOException, InterruptedException { + LOG.info("Invoking cleanup process"); + super.cleanup(context); + try { + if (context.getConfiguration().get(HiveDRArgs.EXECUTION_STAGE.getName()) + .equalsIgnoreCase(HiveDRUtils.ExecutionStage.IMPORT.name())) { + eventUtils.cleanEventsDirectory(); + } + } catch (IOException e) { + LOG.error("Cleaning up of events directories failed", e); + } finally { + try { + eventUtils.closeConnection(); + } catch (SQLException e) { + LOG.error("Closing the connections failed", e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyReducer.java ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyReducer.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyReducer.java new file mode 100644 index 0000000..50cb4b2 --- /dev/null +++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/mapreduce/CopyReducer.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.hive.mapreduce; + + +import org.apache.falcon.hive.HiveDRArgs; +import org.apache.falcon.hive.exception.HiveReplicationException; +import org.apache.falcon.hive.util.DRStatusStore; +import org.apache.falcon.hive.util.FileUtils; +import org.apache.falcon.hive.util.HiveDRStatusStore; +import org.apache.falcon.hive.util.ReplicationStatus; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Reducer; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; + +/** + * Reducer class for Hive DR. + */ +public class CopyReducer extends Reducer<Text, Text, Text, Text> { + private DRStatusStore hiveDRStore; + + @Override + protected void setup(Context context) throws IOException, InterruptedException { + Configuration conf = context.getConfiguration(); + FileSystem fs= FileSystem.get(FileUtils.getConfiguration( + conf.get(HiveDRArgs.TARGET_NN.getName()), + conf.get(HiveDRArgs.TARGET_NN_KERBEROS_PRINCIPAL.getName()))); + hiveDRStore = new HiveDRStatusStore(fs); + } + + private List<ReplicationStatus> sortStatusList(List<ReplicationStatus> replStatusList) { + Collections.sort(replStatusList, new Comparator<ReplicationStatus>() { + @Override + public int compare(ReplicationStatus r1, ReplicationStatus r2) { + return (int) (r1.getEventId() - r2.getEventId()); + } + }); + return replStatusList; + } + + @Override + protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { + List<ReplicationStatus> replStatusList = new ArrayList<ReplicationStatus>(); + ReplicationStatus rs; + try { + for (Text value : values) { + String[] fields = (value.toString()).split("\t"); + rs = new ReplicationStatus(fields[0], fields[1], fields[2], fields[3], fields[4], + ReplicationStatus.Status.valueOf(fields[5]), Long.parseLong(fields[6])); + replStatusList.add(rs); + } + + hiveDRStore.updateReplicationStatus(key.toString(), sortStatusList(replStatusList)); + } catch (HiveReplicationException e) { + throw new IOException(e); + } + } + + @Override + protected void cleanup(Context context) throws IOException, InterruptedException { + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/addons/hivedr/src/main/java/org/apache/falcon/hive/util/DBReplicationStatus.java ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/DBReplicationStatus.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/DBReplicationStatus.java new file mode 100644 index 0000000..6dceb8e --- /dev/null +++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/DBReplicationStatus.java @@ -0,0 +1,213 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.hive.util; + +import org.apache.commons.lang3.StringUtils; +import org.apache.falcon.hive.exception.HiveReplicationException; +import org.codehaus.jettison.json.JSONException; +import org.codehaus.jettison.json.JSONObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +/** + * Class to store replication status of a DB and it's tables. + */ +public class DBReplicationStatus { + + private static final Logger LOG = LoggerFactory.getLogger(DBReplicationStatus.class); + private static final String DB_STATUS = "db_status"; + private static final String TABLE_STATUS = "table_status"; + + private Map<String, ReplicationStatus> tableStatuses = new HashMap<String, ReplicationStatus>(); + private ReplicationStatus databaseStatus; + + public DBReplicationStatus(ReplicationStatus dbStatus) throws HiveReplicationException { + setDatabaseStatus(dbStatus); + } + + public DBReplicationStatus(ReplicationStatus dbStatus, + Map<String, ReplicationStatus> tableStatuses) throws HiveReplicationException { + /* + The order of set method calls is important to ensure tables that do not belong to same db + are not added to this DBReplicationStatus + */ + setDatabaseStatus(dbStatus); + setTableStatuses(tableStatuses); + } + + // Serialize + public String toJsonString() throws HiveReplicationException { + JSONObject retObject = new JSONObject(); + JSONObject tableStatus = new JSONObject(); + try { + for (Map.Entry<String, ReplicationStatus> status : tableStatuses.entrySet()) { + tableStatus.put(status.getKey(), status.getValue().toJsonObject()); + } + retObject.put(DB_STATUS, databaseStatus.toJsonObject()); + retObject.put(TABLE_STATUS, tableStatus); + return retObject.toString(ReplicationStatus.INDENT_FACTOR); + } catch (JSONException e) { + throw new HiveReplicationException("Unable to serialize Database Replication Status", e); + } + } + + // de-serialize + public DBReplicationStatus(String jsonString) throws HiveReplicationException { + try { + JSONObject object = new JSONObject(jsonString); + ReplicationStatus dbstatus = new ReplicationStatus(object.get(DB_STATUS).toString()); + setDatabaseStatus(dbstatus); + + JSONObject tableJson = object.getJSONObject(TABLE_STATUS); + Iterator keys = tableJson.keys(); + while(keys.hasNext()) { + String key = keys.next().toString(); + ReplicationStatus value = new ReplicationStatus(tableJson.get(key).toString()); + if (value.getDatabase().equals(dbstatus.getDatabase())) { + tableStatuses.put(key.toLowerCase(), value); + } else { + throw new HiveReplicationException("Unable to create DBReplicationStatus from JsonString. " + + "Cannot set status for table " + value.getDatabase() + "." + value.getTable() + + ", It does not belong to DB " + dbstatus.getDatabase()); + } + } + } catch (JSONException e) { + throw new HiveReplicationException("Unable to create DBReplicationStatus from JsonString", e); + } + } + + public Map<String, ReplicationStatus> getTableStatuses() { + return tableStatuses; + } + + public ReplicationStatus getTableStatus(String tableName) throws HiveReplicationException { + tableName = tableName.toLowerCase(); + if (tableStatuses.containsKey(tableName)) { + return tableStatuses.get(tableName); + } + return new ReplicationStatus(databaseStatus.getSourceUri(), databaseStatus.getTargetUri(), + databaseStatus.getJobName(), databaseStatus.getDatabase(), + tableName, ReplicationStatus.Status.INIT, -1); + } + + public Iterator<ReplicationStatus> getTableStatusIterator() { + List<ReplicationStatus> resultSet = new ArrayList<ReplicationStatus>(); + for (Map.Entry<String, ReplicationStatus> entry : tableStatuses.entrySet()) { + resultSet.add(entry.getValue()); + } + return resultSet.iterator(); + } + + private void setTableStatuses(Map<String, ReplicationStatus> tableStatuses) throws HiveReplicationException { + for (Map.Entry<String, ReplicationStatus> entry : tableStatuses.entrySet()) { + if (!entry.getValue().getDatabase().equals(databaseStatus.getDatabase())) { + throw new HiveReplicationException("Cannot set status for table " + entry.getValue().getDatabase() + + "." + entry.getValue().getTable() + ", It does not belong to DB " + + databaseStatus.getDatabase()); + } else { + this.tableStatuses.put(entry.getKey().toLowerCase(), entry.getValue()); + } + } + } + + public ReplicationStatus getDatabaseStatus() { + return databaseStatus; + } + + private void setDatabaseStatus(ReplicationStatus databaseStatus) { + this.databaseStatus = databaseStatus; + } + + /** + * Update DB status from table statuses. + case 1) All tables replicated successfully. + Take the largest successful eventId and set dbReplStatus as success + case 2) One or many tables failed to replicate + Take the smallest eventId amongst the failed tables and set dbReplStatus as failed. + */ + public void updateDbStatusFromTableStatuses() throws HiveReplicationException { + if (tableStatuses.size() == 0) { + // nothing to do + return; + } + + databaseStatus.setStatus(ReplicationStatus.Status.SUCCESS); + long successEventId = databaseStatus.getEventId(); + long failedEventId = -1; + + for (Map.Entry<String, ReplicationStatus> entry : tableStatuses.entrySet()) { + long eventId = entry.getValue().getEventId(); + if (entry.getValue().getStatus().equals(ReplicationStatus.Status.SUCCESS)) { + if (eventId > successEventId) { + successEventId = eventId; + } + } else if (entry.getValue().getStatus().equals(ReplicationStatus.Status.FAILURE)) { + databaseStatus.setStatus(ReplicationStatus.Status.FAILURE); + if (eventId < failedEventId || failedEventId == -1) { + failedEventId = eventId; + } + } //else , if table status is Status.INIT, it should not change lastEventId of DB + } + + String log = "Updating DB Status based on table replication status. Status : " + + databaseStatus.getStatus().toString() + ", eventId : "; + if (databaseStatus.getStatus().equals(ReplicationStatus.Status.SUCCESS)) { + databaseStatus.setEventId(successEventId); + LOG.info(log + String.valueOf(successEventId)); + } else if (databaseStatus.getStatus().equals(ReplicationStatus.Status.FAILURE)) { + databaseStatus.setEventId(failedEventId); + LOG.error(log + String.valueOf(failedEventId)); + } + + } + + public void updateDbStatus(ReplicationStatus status) throws HiveReplicationException { + if (StringUtils.isNotEmpty(status.getTable())) { + throw new HiveReplicationException("Cannot update DB Status. This is table level status."); + } + + if (this.databaseStatus.getDatabase().equals(status.getDatabase())) { + this.databaseStatus = status; + } else { + throw new HiveReplicationException("Cannot update Database Status. StatusDB " + + status.getDatabase() + " does not match current DB " + + this.databaseStatus.getDatabase()); + } + } + + public void updateTableStatus(ReplicationStatus status) throws HiveReplicationException { + if (StringUtils.isEmpty(status.getTable())) { + throw new HiveReplicationException("Cannot update Table Status. Table name is empty."); + } + + if (this.databaseStatus.getDatabase().equals(status.getDatabase())) { + this.tableStatuses.put(status.getTable(), status); + } else { + throw new HiveReplicationException("Cannot update Table Status. TableDB " + + status.getDatabase() + " does not match current DB " + + this.databaseStatus.getDatabase()); + } + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/addons/hivedr/src/main/java/org/apache/falcon/hive/util/DRStatusStore.java ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/DRStatusStore.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/DRStatusStore.java new file mode 100644 index 0000000..cf6b7ad --- /dev/null +++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/DRStatusStore.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.hive.util; + +import org.apache.falcon.hive.exception.HiveReplicationException; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.permission.FsPermission; + +import java.util.Iterator; +import java.util.List; + +/** + * Abstract class for Data Replication Status Store. + */ +public abstract class DRStatusStore { + + public static final String BASE_DEFAULT_STORE_PATH = "/apps/data-mirroring/"; + public static final FsPermission DEFAULT_STORE_PERMISSION = + new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.NONE); + + private static String storeGroup = "users"; + + + /** + * Update replication status of a table(s)/db after replication job jobName completes. + * @param jobName Name of the replication job. + * @param statusList List of replication statuses of db/tables replicated by jobName. + */ + public abstract void updateReplicationStatus(String jobName, List<ReplicationStatus> statusList) + throws HiveReplicationException; + + /** + * Get Replication status for a database. + * @param source Replication source uri. + * @param target Replication target uri. + * @param jobName Name of the replication job. + * @param database Name of the target database. + * @return ReplicationStatus + * destination commands for each table + */ + public abstract ReplicationStatus getReplicationStatus(String source, String target, + String jobName, String database) + throws HiveReplicationException; + + /** + * Get Replication status for a table. + * @param source Replication source uri. + * @param target Replication target uri. + * @param jobName Name of the replication job. + * @param database Name of the target database. + * @param table Name of the target table. + * @return ReplicationStatus + * destination commands for each table + */ + public abstract ReplicationStatus getReplicationStatus(String source, String target, + String jobName, String database, + String table) throws HiveReplicationException; + + /** + * Get Replication status of all tables in a database. + * @param source Replication source uri. + * @param target Replication target uri. + * @param jobName Name of the replication job. + * @param database Name of the target database. + * @return Iterator + * destination commands for each table + */ + public abstract Iterator<ReplicationStatus> getTableReplicationStatusesInDb(String source, String target, + String jobName, String database) + throws HiveReplicationException; + + + /** + * Delete a replication job. + * @param jobName Name of the replication job. + * @param database Name of the target database. + * destination commands for each table + */ + public abstract void deleteReplicationStatus(String jobName, String database) throws HiveReplicationException; + + public static String getStoreGroup() { + return storeGroup; + } + + public static void setStoreGroup(String group) { + storeGroup = group; + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/addons/hivedr/src/main/java/org/apache/falcon/hive/util/DelimiterUtils.java ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/DelimiterUtils.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/DelimiterUtils.java new file mode 100644 index 0000000..3b3156f --- /dev/null +++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/DelimiterUtils.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.hive.util; + +/** + * Public delimiters used for event processing. + */ +public final class DelimiterUtils { + public static final String FIELD_DELIM = "\u0001"; + public static final String NEWLINE_DELIM = System.getProperty("line.separator"); + public static final String TAB_DELIM = "\t"; + + private DelimiterUtils() {} +} http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventSourcerUtils.java ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventSourcerUtils.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventSourcerUtils.java new file mode 100644 index 0000000..fb695d0 --- /dev/null +++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventSourcerUtils.java @@ -0,0 +1,189 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.hive.util; + + +import org.apache.commons.codec.binary.Base64; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.falcon.hive.ReplicationEventMetadata; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hive.hcatalog.api.repl.Command; +import org.apache.hive.hcatalog.api.repl.ReplicationUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.Map; + +/** + * Utility methods for event sourcer. + */ +public class EventSourcerUtils { + + private static final String METAFILE_EXTENSION = ".meta"; + private static final String SRCFILE_EXTENSION = ".src"; + private static final String TGTFILE_EXTENSION = ".tgt"; + private Path eventsInputDirPath; + private final boolean shouldKeepHistory; + private final FileSystem jobFS; + + private static final Logger LOG = LoggerFactory.getLogger(EventSourcerUtils.class); + + public EventSourcerUtils(final Configuration conf, final boolean shouldKeepHistory, + final String jobName) throws Exception { + this.shouldKeepHistory = shouldKeepHistory; + jobFS = FileSystem.get(conf); + init(jobName); + } + + private void init(final String jobName) throws Exception { + // Create base dir to store events on cluster where job is running + Path dir = new Path(FileUtils.DEFAULT_EVENT_STORE_PATH); + // Validate base path + FileUtils.validatePath(jobFS, new Path(DRStatusStore.BASE_DEFAULT_STORE_PATH)); + + if (!jobFS.exists(dir)) { + if (!jobFS.mkdirs(dir)) { + throw new Exception("Creating directory failed: " + dir); + } + } + + eventsInputDirPath = new Path(FileUtils.DEFAULT_EVENT_STORE_PATH, jobName); + + if (!jobFS.exists(eventsInputDirPath)) { + if (!jobFS.mkdirs(eventsInputDirPath)) { + throw new Exception("Creating directory failed: " + eventsInputDirPath); + } + } + } + + public OutputStream getFileOutputStream(final String path) throws Exception { + return FileSystem.create(jobFS, new Path(path), FileUtils.FS_PERMISSION_700); + } + + public void closeOutputStream(OutputStream out) throws IOException { + if (out != null) { + try { + out.flush(); + } finally { + IOUtils.closeQuietly(out); + } + } + } + + public void persistReplicationEvents(final OutputStream out, + final java.lang.Iterable + <? extends org.apache.hive.hcatalog.api.repl.Command> cmds) + throws Exception { + for (Command cmd : cmds) { + persistReplicationEvents(out, cmd); + } + } + + public void persistReplicationEvents(final OutputStream out, + final Command cmd) throws Exception { + out.write(ReplicationUtils.serializeCommand(cmd).getBytes()); + LOG.debug("HiveDR Serialized Repl Command : {}", cmd); + out.write(DelimiterUtils.NEWLINE_DELIM.getBytes()); + } + + public String persistToMetaFile(final ReplicationEventMetadata data, final String identifier) throws IOException { + if (data != null && data.getEventFileMetadata() != null && !data.getEventFileMetadata().isEmpty()) { + Path metaFilename = new Path(eventsInputDirPath.toString(), identifier + METAFILE_EXTENSION); + OutputStream out = null; + + try { + out = FileSystem.create(jobFS, metaFilename, FileUtils.FS_PERMISSION_700); + + for (Map.Entry<String, String> entry : data.getEventFileMetadata().entrySet()) { + out.write(entry.getKey().getBytes()); + out.write(DelimiterUtils.FIELD_DELIM.getBytes()); + out.write(entry.getValue().getBytes()); + out.write(DelimiterUtils.NEWLINE_DELIM.getBytes()); + } + out.flush(); + } finally { + IOUtils.closeQuietly(out); + } + return jobFS.makeQualified(metaFilename).toString(); + } else { + return null; + } + } + + public static void updateEventMetadata(ReplicationEventMetadata data, final String dbName, final String tableName, + final String srcFilename, final String tgtFilename) { + if (data == null || data.getEventFileMetadata() == null) { + return; + } + StringBuilder key = new StringBuilder(); + + if (StringUtils.isNotEmpty(dbName)) { + key.append(Base64.encodeBase64URLSafeString(dbName.toLowerCase().getBytes())); + } + key.append(DelimiterUtils.FIELD_DELIM); + if (StringUtils.isNotEmpty(tableName)) { + key.append(Base64.encodeBase64URLSafeString(tableName.toLowerCase().getBytes())); + } + + StringBuilder value = new StringBuilder(); + if (StringUtils.isNotEmpty(srcFilename)) { + value.append(srcFilename); + } + value.append(DelimiterUtils.FIELD_DELIM); + + if (StringUtils.isNotEmpty(tgtFilename)) { + value.append(tgtFilename); + } + + data.getEventFileMetadata().put(key.toString(), value.toString()); + } + + public static void updateEventMetadata(ReplicationEventMetadata data, final ReplicationEventMetadata inputData) { + if (data == null || data.getEventFileMetadata() == null || inputData == null + || inputData.getEventFileMetadata() == null || inputData.getEventFileMetadata().isEmpty()) { + return; + } + + data.getEventFileMetadata().putAll(inputData.getEventFileMetadata()); + } + + public Path getSrcFileName(final String identifier) { + return jobFS.makeQualified(new Path(eventsInputDirPath, identifier + SRCFILE_EXTENSION)); + } + + public Path getTargetFileName(final String identifier) { + return jobFS.makeQualified(new Path(eventsInputDirPath, identifier + TGTFILE_EXTENSION)); + } + + public void cleanUpEventInputDir() { + if (!shouldKeepHistory) { + try { + jobFS.delete(eventsInputDirPath, true); + eventsInputDirPath = null; + } catch (IOException e) { + LOG.error("Unable to cleanup: {}", eventsInputDirPath, e); + } + } + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventUtils.java ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventUtils.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventUtils.java new file mode 100644 index 0000000..0b4200c --- /dev/null +++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/EventUtils.java @@ -0,0 +1,361 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.hive.util; + +import org.apache.commons.codec.binary.Base64; +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.falcon.hive.HiveDRArgs; +import org.apache.falcon.hive.exception.HiveReplicationException; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.tools.DistCp; +import org.apache.hadoop.tools.DistCpOptions; +import org.apache.hive.hcatalog.api.repl.Command; +import org.apache.hive.hcatalog.api.repl.ReplicationUtils; +import org.apache.hadoop.security.UserGroupInformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.io.InputStreamReader; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +/** + * Utility class to handle Hive events for data-mirroring. + */ +public class EventUtils { + private static final String DRIVER_NAME = "org.apache.hive.jdbc.HiveDriver"; + private static final int TIMEOUT_IN_SECS = 300; + private static final String JDBC_PREFIX = "jdbc:"; + private static final int RETRY_ATTEMPTS = 3; + + private Configuration conf = null; + private String sourceHiveServer2Uri = null; + private String sourceDatabase = null; + private String sourceNN = null; + private String sourceNNKerberosPrincipal = null; + private String targetHiveServer2Uri = null; + private String targetStagingPath = null; + private String targetNN = null; + private String targetNNKerberosPrincipal = null; + private String targetStagingUri = null; + private List<Path> sourceCleanUpList = null; + private List<Path> targetCleanUpList = null; + private static final Logger LOG = LoggerFactory.getLogger(EventUtils.class); + + private FileSystem sourceFileSystem = null; + private FileSystem targetFileSystem = null; + private Connection sourceConnection = null; + private Connection targetConnection = null; + private Statement sourceStatement = null; + private Statement targetStatement = null; + + private List<ReplicationStatus> listReplicationStatus; + + public EventUtils(Configuration conf) { + this.conf = conf; + sourceHiveServer2Uri = conf.get(HiveDRArgs.SOURCE_HS2_URI.getName()); + sourceDatabase = conf.get(HiveDRArgs.SOURCE_DATABASE.getName()); + sourceNN = conf.get(HiveDRArgs.SOURCE_NN.getName()); + sourceNNKerberosPrincipal = conf.get(HiveDRArgs.SOURCE_NN_KERBEROS_PRINCIPAL.getName()); + targetHiveServer2Uri = conf.get(HiveDRArgs.TARGET_HS2_URI.getName()); + targetStagingPath = conf.get(HiveDRArgs.TARGET_STAGING_PATH.getName()) + + File.separator + conf.get(HiveDRArgs.JOB_NAME.getName()); + targetNN = conf.get(HiveDRArgs.TARGET_NN.getName()); + targetNNKerberosPrincipal = conf.get(HiveDRArgs.TARGET_NN_KERBEROS_PRINCIPAL.getName()); + sourceCleanUpList = new ArrayList<Path>(); + targetCleanUpList = new ArrayList<Path>(); + } + + public void setupConnection() throws Exception { + Class.forName(DRIVER_NAME); + DriverManager.setLoginTimeout(TIMEOUT_IN_SECS); + String authTokenString = ";auth=delegationToken"; + //To bypass findbugs check, need to store empty password in Properties. + Properties password = new Properties(); + password.put("password", ""); + String user = ""; + + UserGroupInformation currentUser = UserGroupInformation.getCurrentUser(); + if (currentUser != null) { + user = currentUser.getShortUserName(); + } + + if (conf.get(HiveDRArgs.EXECUTION_STAGE.getName()) + .equalsIgnoreCase(HiveDRUtils.ExecutionStage.EXPORT.name())) { + String connString = JDBC_PREFIX + sourceHiveServer2Uri + "/" + sourceDatabase; + if (StringUtils.isNotEmpty(conf.get(HiveDRArgs.SOURCE_HIVE2_KERBEROS_PRINCIPAL.getName()))) { + connString += authTokenString; + } + sourceConnection = DriverManager.getConnection(connString, user, password.getProperty("password")); + sourceStatement = sourceConnection.createStatement(); + } else { + String connString = JDBC_PREFIX + targetHiveServer2Uri + "/" + sourceDatabase; + if (StringUtils.isNotEmpty(conf.get(HiveDRArgs.TARGET_HIVE2_KERBEROS_PRINCIPAL.getName()))) { + connString += authTokenString; + } + targetConnection = DriverManager.getConnection(connString, user, password.getProperty("password")); + targetStatement = targetConnection.createStatement(); + } + } + + public void initializeFS() throws IOException { + LOG.info("Initializing staging directory"); + targetStagingUri = new Path(targetNN, targetStagingPath).toString(); + sourceFileSystem = FileSystem.get(FileUtils.getConfiguration(sourceNN, sourceNNKerberosPrincipal)); + targetFileSystem = FileSystem.get(FileUtils.getConfiguration(targetNN, targetNNKerberosPrincipal)); + } + + private String readEvents(Path eventFileName) throws IOException { + StringBuilder eventString = new StringBuilder(); + BufferedReader in = new BufferedReader(new InputStreamReader(sourceFileSystem.open(eventFileName))); + try { + String line; + while ((line=in.readLine())!=null) { + eventString.append(line); + eventString.append(DelimiterUtils.NEWLINE_DELIM); + } + } catch (Exception e) { + throw new IOException(e); + } finally { + IOUtils.closeQuietly(in); + } + + return eventString.toString(); + } + + public void processEvents(String event) throws Exception { + listReplicationStatus = new ArrayList<ReplicationStatus>(); + String[] eventSplit = event.split(DelimiterUtils.FIELD_DELIM); + String dbName = new String(Base64.decodeBase64(eventSplit[0]), "UTF-8"); + String tableName = new String(Base64.decodeBase64(eventSplit[1]), "UTF-8"); + String exportEventStr; + String importEventStr; + if (conf.get(HiveDRArgs.EXECUTION_STAGE.getName()) + .equalsIgnoreCase(HiveDRUtils.ExecutionStage.EXPORT.name())) { + exportEventStr = readEvents(new Path(eventSplit[2])); + if (StringUtils.isNotEmpty(exportEventStr)) { + LOG.info("Process the export statements for db {} table {}", dbName, tableName); + processCommands(exportEventStr, dbName, tableName, sourceStatement, sourceCleanUpList, false); + if (!sourceCleanUpList.isEmpty()) { + invokeCopy(sourceCleanUpList); + } + } + } else if (conf.get(HiveDRArgs.EXECUTION_STAGE.getName()) + .equalsIgnoreCase(HiveDRUtils.ExecutionStage.IMPORT.name())) { + importEventStr = readEvents(new Path(eventSplit[3])); + if (StringUtils.isNotEmpty(importEventStr)) { + LOG.info("Process the import statements for db {} table {}", dbName, tableName); + processCommands(importEventStr, dbName, tableName, targetStatement, targetCleanUpList, true); + } + } + } + + public List<ReplicationStatus> getListReplicationStatus() { + return listReplicationStatus; + } + + private void processCommands(String eventStr, String dbName, String tableName, Statement sqlStmt, + List<Path> cleanUpList, boolean isImportStatements) + throws SQLException, HiveReplicationException, IOException { + String[] commandList = eventStr.split(DelimiterUtils.NEWLINE_DELIM); + List<Command> deserializeCommand = new ArrayList<Command>(); + for (String command : commandList) { + Command cmd = ReplicationUtils.deserializeCommand(command); + deserializeCommand.add(cmd); + List<String> cleanupLocations = cmd.cleanupLocationsAfterEvent(); + cleanUpList.addAll(getCleanUpPaths(cleanupLocations)); + } + for (Command cmd : deserializeCommand) { + try { + LOG.debug("Executing command : {} : {} ", cmd.getEventId(), cmd.toString()); + executeCommand(cmd, dbName, tableName, sqlStmt, isImportStatements, 0); + } catch (Exception e) { + // clean up locations before failing. + cleanupEventLocations(sourceCleanUpList, sourceFileSystem); + cleanupEventLocations(targetCleanUpList, targetFileSystem); + throw new HiveReplicationException("Could not process replication command for " + + " DB Name:" + dbName + ", Table Name:" + tableName, e); + } + } + } + + private void executeCommand(Command cmd, String dbName, String tableName, + Statement sqlStmt, boolean isImportStatements, int attempt) + throws HiveReplicationException, SQLException, IOException { + for (final String stmt : cmd.get()) { + executeSqlStatement(cmd, dbName, tableName, sqlStmt, stmt, isImportStatements, attempt); + } + if (isImportStatements) { + addReplicationStatus(ReplicationStatus.Status.SUCCESS, dbName, tableName, cmd.getEventId()); + } + } + + private void executeSqlStatement(Command cmd, String dbName, String tableName, + Statement sqlStmt, String stmt, boolean isImportStatements, int attempt) + throws HiveReplicationException, SQLException, IOException { + try { + sqlStmt.execute(stmt); + } catch (SQLException sqeOuter) { + // Retry if command is retriable. + if (attempt < RETRY_ATTEMPTS && cmd.isRetriable()) { + if (isImportStatements) { + try { + cleanupEventLocations(getCleanUpPaths(cmd.cleanupLocationsPerRetry()), targetFileSystem); + } catch (IOException ioe) { + // Clean up failed before retry on target. Update failure status and return + addReplicationStatus(ReplicationStatus.Status.FAILURE, dbName, + tableName, cmd.getEventId()); + throw ioe; + } + } else { + cleanupEventLocations(getCleanUpPaths(cmd.cleanupLocationsPerRetry()), sourceFileSystem); + } + executeCommand(cmd, dbName, tableName, sqlStmt, isImportStatements, ++attempt); + return; // Retry succeeded, return without throwing an exception. + } + // If we reached here, retries have failed. + LOG.error("SQL Exception: {}", sqeOuter); + undoCommand(cmd, dbName, tableName, sqlStmt, isImportStatements); + if (isImportStatements) { + addReplicationStatus(ReplicationStatus.Status.FAILURE, dbName, tableName, cmd.getEventId()); + } + throw sqeOuter; + } + } + + private static List<Path> getCleanUpPaths(List<String> cleanupLocations) { + List<Path> cleanupLocationPaths = new ArrayList<Path>(); + for (String cleanupLocation : cleanupLocations) { + cleanupLocationPaths.add(new Path(cleanupLocation)); + } + return cleanupLocationPaths; + } + + private void undoCommand(Command cmd, String dbName, + String tableName, Statement sqlStmt, boolean isImportStatements) + throws SQLException, HiveReplicationException { + if (cmd.isUndoable()) { + try { + List<String> undoCommands = cmd.getUndo(); + LOG.debug("Undo command: {}", StringUtils.join(undoCommands.toArray())); + if (undoCommands.size() != 0) { + for (final String undoStmt : undoCommands) { + sqlStmt.execute(undoStmt); + } + } + } catch (SQLException sqeInner) { + if (isImportStatements) { + addReplicationStatus(ReplicationStatus.Status.FAILURE, dbName, + tableName, cmd.getEventId()); + } + LOG.error("SQL Exception: {}", sqeInner); + throw sqeInner; + } + } + } + + private void addReplicationStatus(ReplicationStatus.Status status, String dbName, String tableName, long eventId) + throws HiveReplicationException { + try { + String drJobName = conf.get(HiveDRArgs.JOB_NAME.getName()); + ReplicationStatus rs = new ReplicationStatus(conf.get(HiveDRArgs.SOURCE_CLUSTER.getName()), + conf.get(HiveDRArgs.TARGET_CLUSTER.getName()), drJobName, dbName, tableName, status, eventId); + listReplicationStatus.add(rs); + } catch (HiveReplicationException hre) { + throw new HiveReplicationException("Could not update replication status store for " + + " EventId:" + eventId + + " DB Name:" + dbName + + " Table Name:" + tableName + + hre.toString()); + } + } + + public void invokeCopy(List<Path> srcStagingPaths) throws Exception { + DistCpOptions options = getDistCpOptions(srcStagingPaths); + DistCp distCp = new DistCp(conf, options); + LOG.info("Started DistCp with source Path: {} \ttarget path: {}", StringUtils.join(srcStagingPaths.toArray()), + targetStagingUri); + Job distcpJob = distCp.execute(); + LOG.info("Distp Hadoop job: {}", distcpJob.getJobID().toString()); + LOG.info("Completed DistCp"); + } + + public DistCpOptions getDistCpOptions(List<Path> srcStagingPaths) { + srcStagingPaths.toArray(new Path[srcStagingPaths.size()]); + + DistCpOptions distcpOptions = new DistCpOptions(srcStagingPaths, new Path(targetStagingUri)); + /* setSyncFolder to false to retain dir structure as in source at the target. If set to true all files will be + copied to the same staging sir at target resulting in DuplicateFileException in DistCp. + */ + + distcpOptions.setSyncFolder(false); + distcpOptions.setBlocking(true); + distcpOptions.setMaxMaps(Integer.valueOf(conf.get(HiveDRArgs.DISTCP_MAX_MAPS.getName()))); + distcpOptions.setMapBandwidth(Integer.valueOf(conf.get(HiveDRArgs.DISTCP_MAP_BANDWIDTH.getName()))); + return distcpOptions; + } + + public void cleanEventsDirectory() throws IOException { + LOG.info("Cleaning staging directory"); + cleanupEventLocations(sourceCleanUpList, sourceFileSystem); + cleanupEventLocations(targetCleanUpList, targetFileSystem); + } + + private void cleanupEventLocations(List<Path> cleanupList, FileSystem fileSystem) + throws IOException { + for (Path cleanUpPath : cleanupList) { + try { + fileSystem.delete(cleanUpPath, true); + } catch (IOException ioe) { + LOG.error("Cleaning up of staging directory {} failed {}", cleanUpPath, ioe.toString()); + throw ioe; + } + } + + } + + public void closeConnection() throws SQLException { + if (sourceStatement != null) { + sourceStatement.close(); + } + + if (targetStatement != null) { + targetStatement.close(); + } + + if (sourceConnection != null) { + sourceConnection.close(); + } + if (targetConnection != null) { + targetConnection.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/addons/hivedr/src/main/java/org/apache/falcon/hive/util/FileUtils.java ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/FileUtils.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/FileUtils.java new file mode 100644 index 0000000..6bd6319 --- /dev/null +++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/FileUtils.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.hive.util; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.permission.FsPermission; + +import java.io.File; +import java.io.IOException; + +/** + * Utility class to validate HDFS files. + */ +public final class FileUtils { + + public static final String DEFAULT_EVENT_STORE_PATH = DRStatusStore.BASE_DEFAULT_STORE_PATH + + File.separator + "Events"; + public static final FsPermission FS_PERMISSION_700 = new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE); + + + private FileUtils() {} + + public static Configuration getConfiguration(final String writeEP, final String nnKerberosPrincipal) { + Configuration conf = new Configuration(); + conf.set("fs.defaultFS", writeEP); + if (StringUtils.isNotEmpty(nnKerberosPrincipal)) { + conf.set("dfs.namenode.kerberos.principal", nnKerberosPrincipal); + } + return conf; + } + + public static void validatePath(final FileSystem fileSystem, final Path basePath) throws IOException { + if (!fileSystem.exists(basePath)) { + throw new IOException("Please create base dir " + fileSystem.getUri() + basePath + + ". Please set group to " + DRStatusStore.getStoreGroup() + + " and permissions to " + DRStatusStore.DEFAULT_STORE_PERMISSION.toString()); + } + + if (!fileSystem.getFileStatus(basePath).getPermission().equals(DRStatusStore.DEFAULT_STORE_PERMISSION) + || !fileSystem.getFileStatus(basePath).getGroup().equalsIgnoreCase(DRStatusStore.getStoreGroup())) { + throw new IOException("Base dir " + fileSystem.getUri() + basePath + + " does not have correct ownership/permissions." + + " Please set group to " + DRStatusStore.getStoreGroup() + + " and permissions to " + DRStatusStore.DEFAULT_STORE_PERMISSION.toString()); + } + + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveDRStatusStore.java ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveDRStatusStore.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveDRStatusStore.java new file mode 100644 index 0000000..900afe8 --- /dev/null +++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveDRStatusStore.java @@ -0,0 +1,315 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.hive.util; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.lang3.StringUtils; +import org.apache.falcon.hive.exception.HiveReplicationException; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.permission.FsPermission; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +/** + * DRStatusStore implementation for hive. + */ +public class HiveDRStatusStore extends DRStatusStore { + + private static final Logger LOG = LoggerFactory.getLogger(DRStatusStore.class); + private FileSystem fileSystem; + + private static final String DEFAULT_STORE_PATH = BASE_DEFAULT_STORE_PATH + "hiveReplicationStatusStore/"; + private static final FsPermission DEFAULT_STATUS_DIR_PERMISSION = + new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.NONE); + + private static final String LATEST_FILE = "latest.json"; + private static final int FILE_ROTATION_LIMIT = 10; + private static final int FILE_ROTATION_TIME = 86400000; // 1 day + + + public HiveDRStatusStore(FileSystem targetFileSystem) throws IOException { + init(targetFileSystem); + } + + public HiveDRStatusStore(FileSystem targetFileSystem, String group) throws IOException { + HiveDRStatusStore.setStoreGroup(group); + init(targetFileSystem); + } + + private void init(FileSystem targetFileSystem) throws IOException { + this.fileSystem = targetFileSystem; + Path basePath = new Path(BASE_DEFAULT_STORE_PATH); + FileUtils.validatePath(fileSystem, basePath); + + Path storePath = new Path(DEFAULT_STORE_PATH); + if (!fileSystem.exists(storePath)) { + if (!FileSystem.mkdirs(fileSystem, storePath, DEFAULT_STORE_PERMISSION)) { + throw new IOException("mkdir failed for " + DEFAULT_STORE_PATH); + } + } else { + if (!fileSystem.getFileStatus(storePath).getPermission().equals(DEFAULT_STORE_PERMISSION)) { + throw new IOException("Base dir " + DEFAULT_STORE_PATH + "does not have correct permissions. " + + "Please set to 777"); + } + } + } + + /** + get all DB updated by the job. get all current table statuses for the DB merge the latest repl + status with prev table repl statuses. If all statuses are success, store the status as success + with largest eventId for the DB else store status as failure for the DB and lowest eventId. + */ + @Override + public void updateReplicationStatus(String jobName, List<ReplicationStatus> statusList) + throws HiveReplicationException { + Map<String, DBReplicationStatus> dbStatusMap = new HashMap<String, DBReplicationStatus>(); + for (ReplicationStatus status : statusList) { + if (!status.getJobName().equals(jobName)) { + String error = "JobName for status does not match current job \"" + jobName + + "\". Status is " + status.toJsonString(); + LOG.error(error); + throw new HiveReplicationException(error); + } + + // init dbStatusMap and tableStatusMap from existing statuses. + if (!dbStatusMap.containsKey(status.getDatabase())) { + DBReplicationStatus dbStatus = getDbReplicationStatus(status.getSourceUri(), status.getTargetUri(), + status.getJobName(), status.getDatabase()); + dbStatusMap.put(status.getDatabase(), dbStatus); + } + + // update existing statuses with new status for db/tables + if (StringUtils.isEmpty(status.getTable())) { // db level replication status. + dbStatusMap.get(status.getDatabase()).updateDbStatus(status); + } else { // table level replication status + dbStatusMap.get(status.getDatabase()).updateTableStatus(status); + } + } + // write to disk + for (Map.Entry<String, DBReplicationStatus> entry : dbStatusMap.entrySet()) { + writeStatusFile(entry.getValue()); + } + } + + @Override + public ReplicationStatus getReplicationStatus(String source, String target, String jobName, String database) + throws HiveReplicationException { + return getReplicationStatus(source, target, jobName, database, null); + } + + + public ReplicationStatus getReplicationStatus(String source, String target, + String jobName, String database, + String table) throws HiveReplicationException { + if (StringUtils.isEmpty(table)) { + return getDbReplicationStatus(source, target, jobName, database).getDatabaseStatus(); + } else { + return getDbReplicationStatus(source, target, jobName, database).getTableStatus(table); + } + } + + @Override + public Iterator<ReplicationStatus> getTableReplicationStatusesInDb(String source, String target, + String jobName, String database) + throws HiveReplicationException { + DBReplicationStatus dbReplicationStatus = getDbReplicationStatus(source, target, jobName, database); + return dbReplicationStatus.getTableStatusIterator(); + } + + @Override + public void deleteReplicationStatus(String jobName, String database) throws HiveReplicationException { + Path deletePath = getStatusDirPath(database, jobName); + try { + if (fileSystem.exists(deletePath)) { + fileSystem.delete(deletePath, true); + } + } catch (IOException e) { + throw new HiveReplicationException("Failed to delete status for Job " + + jobName + " and DB "+ database, e); + } + + } + + private DBReplicationStatus getDbReplicationStatus(String source, String target, String jobName, + String database) throws HiveReplicationException{ + DBReplicationStatus dbReplicationStatus = null; + Path statusDirPath = getStatusDirPath(database, jobName); + // check if database name or jobName can contain chars not allowed by hdfs dir/file naming. + // if yes, use md5 of the same for dir names. prefer to use actual db names for readability. + + try { + if (fileSystem.exists(statusDirPath)) { + dbReplicationStatus = readStatusFile(statusDirPath); + } + if (null == dbReplicationStatus) { + // Init replication state for this database + ReplicationStatus initDbStatus = new ReplicationStatus(source, target, jobName, + database, null, ReplicationStatus.Status.INIT, -1); + dbReplicationStatus = new DBReplicationStatus(initDbStatus); + if (!FileSystem.mkdirs(fileSystem, statusDirPath, DEFAULT_STATUS_DIR_PERMISSION)) { + String error = "mkdir failed for " + statusDirPath.toString(); + LOG.error(error); + throw new HiveReplicationException(error); + } + writeStatusFile(dbReplicationStatus); + } + return dbReplicationStatus; + } catch (IOException e) { + String error = "Failed to get ReplicationStatus for job " + jobName; + LOG.error(error); + throw new HiveReplicationException(error); + } + } + + private Path getStatusDirPath(DBReplicationStatus dbReplicationStatus) { + ReplicationStatus status = dbReplicationStatus.getDatabaseStatus(); + return getStatusDirPath(status.getDatabase(), status.getJobName()); + } + + public Path getStatusDirPath(String database, String jobName) { + return new Path(DEFAULT_STORE_PATH + "/" + database.toLowerCase() + "/" + jobName); + } + + private void writeStatusFile(DBReplicationStatus dbReplicationStatus) throws HiveReplicationException { + dbReplicationStatus.updateDbStatusFromTableStatuses(); + String statusDir = getStatusDirPath(dbReplicationStatus).toString(); + try { + Path latestFile = new Path(statusDir + "/" + LATEST_FILE); + if (fileSystem.exists(latestFile)) { + Path renamedFile = new Path(statusDir + "/" + + String.valueOf(fileSystem.getFileStatus(latestFile).getModificationTime()) + ".json"); + fileSystem.rename(latestFile, renamedFile); + } + + FSDataOutputStream stream = FileSystem.create(fileSystem, latestFile, DEFAULT_STATUS_DIR_PERMISSION); + stream.write(dbReplicationStatus.toJsonString().getBytes()); + stream.close(); + + } catch (IOException e) { + String error = "Failed to write latest Replication status into dir " + statusDir; + LOG.error(error); + throw new HiveReplicationException(error); + } + + rotateStatusFiles(new Path(statusDir), FILE_ROTATION_LIMIT, FILE_ROTATION_TIME); + } + + public void rotateStatusFiles(Path statusDir, int numFiles, int maxFileAge) throws HiveReplicationException { + + List<String> fileList = new ArrayList<String>(); + long now = System.currentTimeMillis(); + try { + RemoteIterator<LocatedFileStatus> fileIterator = fileSystem.listFiles(statusDir, false); + while (fileIterator.hasNext()) { + fileList.add(fileIterator.next().getPath().toString()); + } + if (fileList.size() > (numFiles+1)) { + // delete some files, as long as they are older than the time. + Collections.sort(fileList); + for (String file : fileList.subList(0, (fileList.size() - numFiles + 1))) { + long modTime = fileSystem.getFileStatus(new Path(file)).getModificationTime(); + if ((now - modTime) > maxFileAge) { + Path deleteFilePath = new Path(file); + if (fileSystem.exists(deleteFilePath)) { + fileSystem.delete(deleteFilePath, false); + } + } + } + } + } catch (IOException e) { + String error = "Failed to rotate status files in dir " + statusDir.toString(); + LOG.error(error); + throw new HiveReplicationException(error); + } + } + + private DBReplicationStatus readStatusFile(Path statusDirPath) throws HiveReplicationException { + try { + Path statusFile = new Path(statusDirPath.toString() + "/" + LATEST_FILE); + if ((!fileSystem.exists(statusDirPath)) || (!fileSystem.exists(statusFile))) { + return null; + } else { + return new DBReplicationStatus(IOUtils.toString(fileSystem.open(statusFile))); + } + } catch (IOException e) { + String error = "Failed to read latest Replication status from dir " + statusDirPath.toString(); + LOG.error(error); + throw new HiveReplicationException(error); + } + } + + public void checkForReplicationConflict(String newSource, String jobName, + String database, String table) throws HiveReplicationException { + try { + Path globPath = new Path(DEFAULT_STORE_PATH + "/" + database.toLowerCase() + "/*/latest.json"); + FileStatus[] files = fileSystem.globStatus(globPath); + for(FileStatus file : files) { + DBReplicationStatus dbFileStatus = new DBReplicationStatus(IOUtils.toString( + fileSystem.open(file.getPath()))); + ReplicationStatus existingJob = dbFileStatus.getDatabaseStatus(); + + if (!(newSource.equals(existingJob.getSourceUri()))) { + throw new HiveReplicationException("Two different sources are attempting to replicate to same db " + + database + ". New Source = " + newSource + + ", Existing Source = " + existingJob.getSourceUri()); + } // two different sources replicating to same DB. Conflict + if (jobName.equals(existingJob.getJobName())) { + continue; + } // same job, no conflict. + + if (StringUtils.isEmpty(table)) { + // When it is DB level replication, two different jobs cannot replicate to same DB + throw new HiveReplicationException("Two different jobs are attempting to replicate to same db " + + database.toLowerCase() + ". New Job = " + jobName + + ", Existing Job = " + existingJob.getJobName()); + } + + /* + At this point, it is different table level jobs replicating from same newSource to same target. This is + allowed as long as the target tables are different. For example, job1 can replicate db1.table1 and + job2 can replicate db1.table2. Both jobs cannot replicate to same table. + */ + for(Map.Entry<String, ReplicationStatus> entry : dbFileStatus.getTableStatuses().entrySet()) { + if (table.equals(entry.getKey())) { + throw new HiveReplicationException("Two different jobs are trying to replicate to same table " + + entry.getKey() + ". New job = " + jobName + + ", Existing job = " + existingJob.getJobName()); + } + } + } + } catch (IOException e) { + throw new HiveReplicationException("Failed to read status files for DB " + + database, e); + } + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveDRUtils.java ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveDRUtils.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveDRUtils.java new file mode 100644 index 0000000..d9d6ab0 --- /dev/null +++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveDRUtils.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.hive.util; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.util.Shell; + +import java.io.File; +import java.io.IOException; +import java.util.List; + +/** + * Hive replication utility class. + */ +public final class HiveDRUtils { + /** + * Enum for Hive replication type. + */ + public enum ReplicationType { + TABLE, + DB + } + + /** + * Enum for hive-dr action type. + */ + public enum ExecutionStage { + IMPORT, + EXPORT, + LASTEVENTS + } + + private static final String ALL_TABLES = "*"; + + public static final String SEPARATOR = File.separator; + + private HiveDRUtils() {} + + public static ReplicationType getReplicationType(List<String> sourceTables) { + return (sourceTables.size() == 1 && sourceTables.get(0).equals(ALL_TABLES)) ? ReplicationType.DB + : ReplicationType.TABLE; + } + + public static Configuration getDefaultConf() throws IOException { + Configuration conf = new Configuration(); + conf.addResource(new Path("file:///", System.getProperty("oozie.action.conf.xml"))); + String delegationToken = getFilePathFromEnv("HADOOP_TOKEN_FILE_LOCATION"); + if (delegationToken != null) { + conf.set("mapreduce.job.credentials.binary", delegationToken); + conf.set("tez.credentials.path", delegationToken); + } + return conf; + } + + public static String getFilePathFromEnv(String env) { + String path = System.getenv(env); + if (path != null && Shell.WINDOWS) { + // In Windows, file paths are enclosed in \" so remove them here + // to avoid path errors + if (path.charAt(0) == '"') { + path = path.substring(1); + } + if (path.charAt(path.length() - 1) == '"') { + path = path.substring(0, path.length() - 1); + } + } + return path; + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveMetastoreUtils.java ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveMetastoreUtils.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveMetastoreUtils.java new file mode 100644 index 0000000..ea19f09 --- /dev/null +++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/HiveMetastoreUtils.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.hive.util; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hive.hcatalog.api.HCatClient; +import org.apache.hive.hcatalog.api.repl.exim.EximReplicationTaskFactory; +import org.apache.hive.hcatalog.cli.SemanticAnalysis.HCatSemanticAnalyzer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; + +/** + * Create hive metastore client for user. + */ +public final class HiveMetastoreUtils { + + private static final Logger LOG = LoggerFactory.getLogger(HiveMetastoreUtils.class); + + private HiveMetastoreUtils() {} + + public static HCatClient initializeHiveMetaStoreClient(String metastoreUri, String metastorePrincipal, + String hive2Principal) throws Exception { + try { + HiveConf hcatConf = createHiveConf(HiveDRUtils.getDefaultConf(), + metastoreUri, metastorePrincipal, hive2Principal); + HCatClient client = HCatClient.create(hcatConf); + return client; + } catch (IOException e) { + throw new Exception("Exception creating HCatClient: " + e.getMessage(), e); + } + } + + private static HiveConf createHiveConf(Configuration conf, String metastoreUrl, String metastorePrincipal, + String hive2Principal) throws IOException { + JobConf jobConf = new JobConf(conf); + String delegationToken = HiveDRUtils.getFilePathFromEnv("HADOOP_TOKEN_FILE_LOCATION"); + if (delegationToken != null) { + Credentials credentials = Credentials.readTokenStorageFile(new File(delegationToken), conf); + jobConf.setCredentials(credentials); + UserGroupInformation.getCurrentUser().addCredentials(credentials); + } + + HiveConf hcatConf = new HiveConf(jobConf, HiveConf.class); + + hcatConf.setVar(HiveConf.ConfVars.METASTOREURIS, metastoreUrl); + hcatConf.setIntVar(HiveConf.ConfVars.METASTORETHRIFTCONNECTIONRETRIES, 3); + hcatConf.set(HiveConf.ConfVars.SEMANTIC_ANALYZER_HOOK.varname, + HCatSemanticAnalyzer.class.getName()); + hcatConf.set(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY.varname, "false"); + + hcatConf.set(HiveConf.ConfVars.PREEXECHOOKS.varname, ""); + hcatConf.set(HiveConf.ConfVars.POSTEXECHOOKS.varname, ""); + hcatConf.set(HiveConf.ConfVars.HIVE_REPL_TASK_FACTORY.varname, EximReplicationTaskFactory.class.getName()); + if (StringUtils.isNotEmpty(metastorePrincipal)) { + hcatConf.set(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname, metastorePrincipal); + hcatConf.set(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL.varname, "true"); + hcatConf.set(HiveConf.ConfVars.METASTORE_EXECUTE_SET_UGI.varname, "true"); + hcatConf.set("hadoop.rpc.protection", "authentication"); + } + if (StringUtils.isNotEmpty(hive2Principal)) { + hcatConf.set(HiveConf.ConfVars.HIVE_SERVER2_KERBEROS_PRINCIPAL.varname, hive2Principal); + hcatConf.set(HiveConf.ConfVars.HIVE_SERVER2_AUTHENTICATION.varname, "kerberos"); + } + + return hcatConf; + } + +} http://git-wip-us.apache.org/repos/asf/falcon/blob/cbb38ce1/addons/hivedr/src/main/java/org/apache/falcon/hive/util/ReplicationStatus.java ---------------------------------------------------------------------- diff --git a/addons/hivedr/src/main/java/org/apache/falcon/hive/util/ReplicationStatus.java b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/ReplicationStatus.java new file mode 100644 index 0000000..bb33772 --- /dev/null +++ b/addons/hivedr/src/main/java/org/apache/falcon/hive/util/ReplicationStatus.java @@ -0,0 +1,221 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.hive.util; + +import org.apache.commons.lang3.StringUtils; +import org.apache.falcon.hive.exception.HiveReplicationException; +import org.codehaus.jettison.json.JSONException; +import org.codehaus.jettison.json.JSONObject; + +/** + * Object to store replication status of a DB or a table. + */ +public class ReplicationStatus { + + public static final int INDENT_FACTOR = 4; + private static final String SOURCE = "sourceUri"; + private static final String TARGET = "targetUri"; + private static final String JOB_NAME = "jobName"; + private static final String DATABASE = "database"; + private static final String TABLE = "table"; + private static final String EVENT_ID = "eventId"; + private static final String STATUS_KEY = "status"; + private static final String STATUS_LOG = "statusLog"; + + /** + * Replication Status enum. + */ + public static enum Status { + INIT, + SUCCESS, + FAILURE + } + + private String sourceUri; + private String targetUri; + private String jobName; + private String database; + private String table; + private Status status = Status.SUCCESS; + private long eventId = -1; + private String log; + + //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck + public ReplicationStatus(String sourceUri, String targetUri, String jobName, + String database, String table, + ReplicationStatus.Status status, long eventId) throws HiveReplicationException { + init(sourceUri, targetUri, jobName, database, table, status, eventId, null); + } + + private void init(String source, String target, String job, + String dbName, String tableName, ReplicationStatus.Status replStatus, + long eventNum, String logStr) throws HiveReplicationException { + setSourceUri(source); + setTargetUri(target); + setJobName(job); + setDatabase(dbName); + setTable(tableName); + setStatus(replStatus); + setEventId(eventNum); + setLog(logStr); + } + //RESUME CHECKSTYLE CHECK ParameterNumberCheck + + public ReplicationStatus(String jsonString) throws HiveReplicationException { + try { + JSONObject object = new JSONObject(jsonString); + Status objectStatus; + try { + objectStatus = ReplicationStatus.Status.valueOf(object.getString(STATUS_KEY).toUpperCase()); + } catch (IllegalArgumentException e1) { + throw new HiveReplicationException("Unable to deserialize jsonString to ReplicationStatus." + + " Invalid status " + object.getString(STATUS_KEY), e1); + } + + init(object.getString(SOURCE), object.getString(TARGET), object.getString(JOB_NAME), + object.getString(DATABASE), object.has(TABLE) ? object.getString(TABLE) : null, + objectStatus, object.has(EVENT_ID) ? object.getLong(EVENT_ID) : -1, + object.has(STATUS_LOG) ? object.getString(STATUS_LOG) : null); + } catch (JSONException e) { + throw new HiveReplicationException("Unable to deserialize jsonString to ReplicationStatus ", e); + } + + } + + public String toJsonString() throws HiveReplicationException { + try { + return toJsonObject().toString(INDENT_FACTOR); + } catch (JSONException e) { + throw new HiveReplicationException("Unable to serialize ReplicationStatus ", e); + } + } + + public JSONObject toJsonObject() throws HiveReplicationException { + JSONObject jsonObject = new JSONObject(); + try { + jsonObject.put(SOURCE, this.sourceUri); + jsonObject.put(TARGET, this.targetUri); + jsonObject.put(JOB_NAME, this.jobName); + jsonObject.put(DATABASE, this.database); + if (StringUtils.isNotEmpty(this.table)) { + jsonObject.put(TABLE, this.table); + } + jsonObject.put(STATUS_KEY, this.status.name()); + if (this.eventId > -1) { + jsonObject.put(EVENT_ID, this.eventId); + } else { + jsonObject.put(EVENT_ID, -1); + } + if (StringUtils.isNotEmpty(this.log)) { + jsonObject.put(STATUS_LOG, this.log); + } + return jsonObject; + } catch (JSONException e) { + throw new HiveReplicationException("Unable to serialize ReplicationStatus ", e); + } + } + + public String getSourceUri() { + return this.sourceUri; + } + + public void setSourceUri(String source) throws HiveReplicationException { + validateString(SOURCE, source); + this.sourceUri = source; + } + + public String getTargetUri() { + return this.targetUri; + } + + public void setTargetUri(String target) throws HiveReplicationException { + validateString(TARGET, target); + this.targetUri = target; + } + + public String getJobName() { + return this.jobName; + } + + public void setJobName(String jobName) throws HiveReplicationException { + validateString(JOB_NAME, jobName); + this.jobName = jobName; + } + + public String getDatabase() { + return this.database; + } + + public void setDatabase(String database) throws HiveReplicationException { + validateString(DATABASE, database); + this.database = database.toLowerCase(); + } + + public String getTable() { + return this.table; + } + + public void setTable(String table) { + this.table = (table == null) ? null : table.toLowerCase(); + } + + public Status getStatus() { + return this.status; + } + + public void setStatus(Status status) throws HiveReplicationException { + if (status != null) { + this.status = status; + } else { + throw new HiveReplicationException("Failed to set ReplicationStatus. Input \"" + + STATUS_KEY + "\" cannot be empty"); + } + } + + public long getEventId() { + return this.eventId; + } + + public void setEventId(long eventId) throws HiveReplicationException { + if (eventId > -1) { + this.eventId = eventId; + } + } + + public String getLog() { + return this.log; + } + + public void setLog(String log) { + this.log = log; + } + + private void validateString(String inputName, String input) throws HiveReplicationException { + if (StringUtils.isEmpty(input)) { + throw new HiveReplicationException("Failed to set ReplicationStatus. Input \"" + + inputName + "\" cannot be empty"); + } + } + + public String toString() { + return sourceUri + "\t" + targetUri + "\t" + jobName + "\t" + + database + "\t"+ table + "\t" + status + "\t"+ eventId; + } + +}
