Pritom Ahmed has uploaded a new change for review. https://asterix-gerrit.ics.uci.edu/350
Change subject: Periodically saving JSON data from admin console API to a temporary dataset. ...................................................................... Periodically saving JSON data from admin console API to a temporary dataset. The following commits from your working branch will be included: commit ccd6b40d58cc50dfad47a5f999bf20a5eebcb60d Author: Pritom Ahmed <[email protected]> Date: Sun Aug 9 17:02:47 2015 -0700 JSON data from hyracks admin console API is saved in Temporary Dataset. We are now saving JSON data from admin console API in temporary dataset every 5 seconds. Change-Id: I6cb186abda6a14d9eb866259d459ce5b5e855be8 --- M asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/AsterixGlobalRecoveryManager.java M asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java A asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/GatherJSONData.java A asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/JobLifeCycleListener.java 4 files changed, 391 insertions(+), 12 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/50/350/1 diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/AsterixGlobalRecoveryManager.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/AsterixGlobalRecoveryManager.java index 8e633a9..74442e1 100644 --- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/AsterixGlobalRecoveryManager.java +++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/AsterixGlobalRecoveryManager.java @@ -14,13 +14,9 @@ */ package edu.uci.ics.asterix.hyracks.bootstrap; -import java.util.List; -import java.util.Set; -import java.util.logging.Level; -import java.util.logging.Logger; - import edu.uci.ics.asterix.common.api.IClusterEventsSubscriber; import edu.uci.ics.asterix.common.api.IClusterManagementWork; +import edu.uci.ics.asterix.common.api.IClusterManagementWork.ClusterState; import edu.uci.ics.asterix.common.api.IClusterManagementWorkResponse; import edu.uci.ics.asterix.common.config.DatasetConfig.DatasetType; import edu.uci.ics.asterix.common.config.DatasetConfig.ExternalDatasetTransactionState; @@ -31,16 +27,16 @@ import edu.uci.ics.asterix.metadata.MetadataTransactionContext; import edu.uci.ics.asterix.metadata.bootstrap.MetadataConstants; import edu.uci.ics.asterix.metadata.declared.AqlMetadataProvider; -import edu.uci.ics.asterix.metadata.entities.Dataset; -import edu.uci.ics.asterix.metadata.entities.Dataverse; -import edu.uci.ics.asterix.metadata.entities.ExternalDatasetDetails; -import edu.uci.ics.asterix.metadata.entities.ExternalFile; -import edu.uci.ics.asterix.metadata.entities.Index; +import edu.uci.ics.asterix.metadata.entities.*; import edu.uci.ics.asterix.om.util.AsterixClusterProperties; -import edu.uci.ics.asterix.common.api.IClusterManagementWork.ClusterState; import edu.uci.ics.hyracks.api.client.HyracksConnection; import edu.uci.ics.hyracks.api.job.JobId; import edu.uci.ics.hyracks.api.job.JobSpecification; + +import java.util.List; +import java.util.Set; +import java.util.logging.Level; +import java.util.logging.Logger; public class AsterixGlobalRecoveryManager implements IClusterEventsSubscriber { @@ -58,6 +54,8 @@ public Set<IClusterManagementWork> notifyNodeFailure(Set<String> deadNodeIds) { state = AsterixClusterProperties.INSTANCE.getState(); AsterixClusterProperties.INSTANCE.setGlobalRecoveryCompleted(false); +// GatherJSONData gatherJSONData = new GatherJSONData(/*GatherJSONData.NODE, GatherJSONData.NODE_FAILURE, deadNodeIds*/); +// (new Thread(gatherJSONData)).start(); return null; } @@ -191,6 +189,18 @@ }); state = newState; recoveryThread.start(); + + /* TODO: 1. check if all the node clusters have joined/started */ + //Cluster status is changed to ACTIVE so notify to gather JSON data of adminconsole + while(!recoveryThread.isAlive()) { + try { + wait(100); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + GatherJSONData gatherJSONData = new GatherJSONData(); + (new Thread(gatherJSONData)).start(); } return null; } diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java index a93461d..39c5ffb 100644 --- a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java +++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/CCApplicationEntryPoint.java @@ -92,7 +92,7 @@ setupFeedServer(externalProperties); feedServer.start(); - centralFeedManager = CentralFeedManager.getInstance(); + centralFeedManager = CentralFeedManager.getInstance(); centralFeedManager.start(); waitUntilServerStart(webServer); @@ -104,6 +104,7 @@ ClusterManager.INSTANCE.registerSubscriber(AsterixGlobalRecoveryManager.INSTANCE); ccAppCtx.addClusterLifecycleListener(ClusterLifecycleListener.INSTANCE); + ccAppCtx.addJobLifecycleListener(JobLifeCycleListener.INSTANCE); } private void waitUntilServerStart(AbstractLifeCycle webServer) throws Exception { diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/GatherJSONData.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/GatherJSONData.java new file mode 100644 index 0000000..823fd37 --- /dev/null +++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/GatherJSONData.java @@ -0,0 +1,330 @@ +package edu.uci.ics.asterix.hyracks.bootstrap; + +import edu.uci.ics.asterix.common.config.GlobalConfig; +import org.apache.commons.httpclient.*; +import org.apache.commons.httpclient.methods.GetMethod; +import org.apache.commons.httpclient.methods.PostMethod; +import org.apache.commons.httpclient.methods.StringRequestEntity; +import org.apache.commons.httpclient.params.HttpMethodParams; +import org.apache.http.HttpEntity; +import org.apache.http.HttpHost; +import org.apache.http.HttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.impl.client.DefaultHttpClient; +import org.apache.http.util.EntityUtils; +import org.json.JSONObject; + +import java.io.*; +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.HashMap; +import java.util.Map; +import java.util.logging.Level; + +/** + * Created by pritom on 7/14/15. + */ + +public class GatherJSONData implements Runnable { + + public static final String JOB_RUN = "job-run"; + public static final String JOB_ACTIVITY_GRAPH = "job-activity-graph"; + public static final String NODE = "node"; + public static final String JOBS = "jobs"; + public static final String NODES = "nodes"; + public static final String NODE_FAILURE = "failure"; + public static final String NODE_JOIN = "join"; + + private static final String DATATYPE = "TempJSONDataType"; + private static final String DATASET_NAME = "TempJSONDataSet"; + private static final String DATAVERSE_NAME = "TempJSONData"; + + private static boolean ddlCreated = false; + + public GatherJSONData() { + } + + @Override + public void run() { +// running = true; + /*If its running for the first time then we need to run ddls*/ + if (!ddlCreated) { + try { + String ddlString = "drop dataverse " + DATAVERSE_NAME + " if exists;\n" + + "create dataverse "+ DATAVERSE_NAME + ";\n" + + "use dataverse " + DATAVERSE_NAME + ";\n" + + "\n" + + "create type " + DATATYPE + " as closed {\n" + + " id:uuid,\n" + + " typ: string,\n" + + " tpid: string,\n" + + " timestamp: datetime,\n" + + " data: string\n" + + "}\n" + + "\n" + + "create temporary dataset " + DATASET_NAME + "(" + DATATYPE + ")\n" + + "primary key id autogenerated;"; + + executeDDL(ddlString); + ddlCreated = true; + + } catch (Exception e) { + e.printStackTrace(); + } + } + + while(true) { + fetchJSONData(); + try { + Thread.sleep(10000); /* Waiting for 10 seconds */ + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + + testQuery(); + } + } + + private static void testQuery() { + /*Query*/ + String queryString = "use dataverse " + DATAVERSE_NAME + ";\n" + + "\n" + + " for $tdata in dataset " + DATASET_NAME + "\n" + + " return $tdata;"; + try { + writeOutputToFile(new File("/home/pritom/query-result-" + System.currentTimeMillis() + ".txt"), executeQuery(queryString)); + } catch (Exception e) { + e.printStackTrace(); + } + } + + private static void writeOutputToFile(File actualFile, InputStream resultStream) throws Exception { + byte[] buffer = new byte[10240]; + int len; + try (FileOutputStream out = new FileOutputStream(actualFile)) { + while ((len = resultStream.read(buffer)) != -1) { + out.write(buffer, 0, len); + } + } + } + + private void executeDDL(String ddlString) throws Exception { + final String url = "http://localhost:19002/ddl"; + + // Create a method instance. + PostMethod method = new PostMethod(url); + method.setRequestEntity(new StringRequestEntity(ddlString)); + + // Execute the method. + executeHttpMethod(method, ddlString); + } + + private void fetchJSONData() { + DefaultHttpClient httpclient = new DefaultHttpClient(); + HttpHost target = new HttpHost("127.0.0.1", 8888, "http"); + String data = ""; + try { + HttpGet nodeInfoRequest = new HttpGet("/rest/nodes/"); + HttpResponse nodeInfoResponse = httpclient.execute(target, nodeInfoRequest); + HttpEntity nodeEntity = nodeInfoResponse.getEntity(); + + if (nodeEntity != null) { + data = EntityUtils.toString(nodeEntity); + insertIntoTemporaryDatabase(data, NODES, ""); + } + + } catch (Exception e) { + e.printStackTrace(); + } finally { + httpclient.getConnectionManager().shutdown(); + } + + try { + JSONObject jsonObject = new JSONObject(data); + org.json.JSONArray nodes = jsonObject.getJSONArray("result"); + for (int i = 0; i < nodes.length(); i++) { + String nodeId = nodes.getJSONObject(i).getString("node-id"); + fetchNodeData(nodeId, NODE_JOIN); + } + } catch (Exception e) { + e.printStackTrace(); + } + + httpclient = new DefaultHttpClient(); + data = ""; + try { + HttpGet jobRunRequest = new HttpGet("/rest/jobs/"); + HttpResponse jobRunResponse = httpclient.execute(target, jobRunRequest); + HttpEntity jobRunEntity = jobRunResponse.getEntity(); + + if (jobRunEntity != null) { + data = EntityUtils.toString(jobRunEntity); + insertIntoTemporaryDatabase(data, JOBS, ""); + } + } catch (Exception e) { + e.printStackTrace(); + } finally { + httpclient.getConnectionManager().shutdown(); + } + + try { + JSONObject jsonObject = new JSONObject(data); + org.json.JSONArray nodes = jsonObject.getJSONArray("result"); + for (int i = 0; i < nodes.length(); i++) { + String nodeId = nodes.getJSONObject(i).getString("job-id"); + fetchJobData(nodeId); + } + } catch (Exception e) { + e.printStackTrace(); + } + } + + public void fetchNodeData(String nodeId, String eventType) { + DefaultHttpClient httpclient = new DefaultHttpClient(); + try { + HttpHost target = new HttpHost("127.0.0.1", 8888, "http"); + HttpGet nodeInfoRequest = new HttpGet("/rest/nodes/" + nodeId); + HttpResponse nodeInfoResponse = httpclient.execute(target, nodeInfoRequest); + HttpEntity nodeEntity = nodeInfoResponse.getEntity(); + + if (nodeEntity != null) { + String data = EntityUtils.toString(nodeEntity); + insertIntoTemporaryDatabase(data, NODE, nodeId); + } + } catch (Exception e) { + e.printStackTrace(); + } finally { + httpclient.getConnectionManager().shutdown(); + } + } + + /* TODO: Decide whether activity graph and job run should be separated so that they can be called at different stage of + the job. For example, job run called after job termination and job activity called after job creation or job start*/ + public void fetchJobData(String jobID) { + DefaultHttpClient httpclient = new DefaultHttpClient(); + try { + HttpHost target = new HttpHost("127.0.0.1", 8888, "http"); + HttpGet jobRunRequest = new HttpGet("/rest/jobs/" + jobID + "/job-run"); + HttpResponse jobRunResponse = httpclient.execute(target, jobRunRequest); + HttpEntity jobRunEntity = jobRunResponse.getEntity(); + + if (jobRunEntity != null) { + insertIntoTemporaryDatabase(EntityUtils.toString(jobRunEntity), JOB_RUN, jobID); + } + + HttpGet jobActivityGraphRequest = new HttpGet("/rest/jobs/" + jobID + "/job-activity-graph"); + HttpResponse jobActivityGraphResponse = httpclient.execute(target, jobActivityGraphRequest); + HttpEntity jobActivityGraphEntity = jobActivityGraphResponse.getEntity(); + + if (jobActivityGraphEntity != null) { + insertIntoTemporaryDatabase(EntityUtils.toString(jobActivityGraphEntity), JOB_ACTIVITY_GRAPH, jobID); + } + + } catch (Exception e) { + e.printStackTrace(); + } finally { + httpclient.getConnectionManager().shutdown(); + } + } + + public void insertIntoTemporaryDatabase(String data, String type, String typeId) throws Exception { + /* Check if data is empty */ + JSONObject object = new JSONObject(data); + if ("{}".equals(object.get("result").toString())) { + return; + } + + Map<String, String> functions = new HashMap<>(); + functions.put("timestamp" , "datetime(\"" + getCurrentDateTime() + "\")"); + + JSONObject jsonObject = new JSONObject(); + jsonObject.put("typ", type); + jsonObject.put("tpid", typeId); + jsonObject.put("data", data); + + StringBuilder builder = new StringBuilder(); + builder.append("use dataverse " + DATAVERSE_NAME + ";" + "\n"); + builder.append("insert into dataset " + DATASET_NAME + " "); + builder.append(" (" + processJSON(jsonObject, functions) + ")"); + builder.append(";"); + + executeUpdate(builder.toString()); + } + + private String processJSON(JSONObject jsonObject, Map<String, String> functions) { + String str = jsonObject.toString(); + int indexOfClosingBrace = str.lastIndexOf("}"); + str = (String) str.subSequence(0, indexOfClosingBrace); + str += ","; + for (String key : functions.keySet()) { + str += "\"" + key + "\":" + functions.get(key); + str += ","; + } + str = str.substring(0, str.length()-1); + str += "}"; + return str; + } + + private String getCurrentDateTime() { + DateFormat dateFormat1 = new SimpleDateFormat("yyyy-MM-dd"); + DateFormat dateFormat2 = new SimpleDateFormat("HH:mm:ss"); + Date date = new Date(); + return dateFormat1.format(date) + "T" + dateFormat2.format(date); + } + + /*use dataverse TempJSONData; + + for $tdata in dataset TempJSONDataSet + return $tdata;*/ + + + // To execute Update statements + // Insert and Delete statements are executed here + public static void executeUpdate(String str) throws Exception { + final String url = "http://localhost:19002/update"; + // Create a method instance. + PostMethod method = new PostMethod(url); + method.setRequestEntity(new StringRequestEntity(str)); + // Execute the method. + executeHttpMethod(method, str); + } + + // Executes Query and returns results as JSONArray + public static InputStream executeQuery(String str) throws Exception { + final String url = "http://localhost:19002/query"; + + // Create a method instance. + GetMethod method = new GetMethod(url); + method.setQueryString(new NameValuePair[] { new NameValuePair("query", str) }); + method.setRequestHeader("Accept", "application/json"); + + // Provide custom retry handler is necessary + method.getParams().setParameter(HttpMethodParams.RETRY_HANDLER, new DefaultHttpMethodRetryHandler(3, false)); + executeHttpMethod(method, str); + return method.getResponseBodyAsStream(); + } + + private static int executeHttpMethod(HttpMethod method, String ddlString) throws Exception { + HttpClient client = new HttpClient(); + int statusCode; + try { + statusCode = client.executeMethod(method); + } catch (Exception e) { + GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, e.getMessage(), e); + e.printStackTrace(); + throw e; + } + if (statusCode != HttpStatus.SC_OK) { + String errorBody = method.getResponseBodyAsString(); + JSONObject result = new JSONObject(errorBody); + String[] errors = { result.getJSONArray("error-code").getString(0), result.getString("summary"), + result.getString("stacktrace") }; + GlobalConfig.ASTERIX_LOGGER.log(Level.SEVERE, errors[2]); + throw new Exception("HTTP operation failed: " + errors[0] + "\nSTATUS LINE: " + method.getStatusLine() + + "\nSUMMARY: " + errors[1] + "\nSTACKTRACE: " + errors[2]); + } + return statusCode; + } + +} diff --git a/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/JobLifeCycleListener.java b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/JobLifeCycleListener.java new file mode 100644 index 0000000..624f0ec --- /dev/null +++ b/asterix-app/src/main/java/edu/uci/ics/asterix/hyracks/bootstrap/JobLifeCycleListener.java @@ -0,0 +1,38 @@ +package edu.uci.ics.asterix.hyracks.bootstrap; + +import edu.uci.ics.hyracks.api.exceptions.HyracksException; +import edu.uci.ics.hyracks.api.job.IActivityClusterGraphGeneratorFactory; +import edu.uci.ics.hyracks.api.job.IJobLifecycleListener; +import edu.uci.ics.hyracks.api.job.JobId; +import org.apache.http.util.EntityUtils; + +import java.io.FileNotFoundException; +import java.io.PrintWriter; +import java.io.UnsupportedEncodingException; + +/** + * Created by pritom on 7/20/15. + */ + +/* TODO: 2. Then periodically gather various JSON data */ + +public class JobLifeCycleListener implements IJobLifecycleListener { + + public static JobLifeCycleListener INSTANCE = new JobLifeCycleListener(); + + @Override + public void notifyJobCreation(JobId jobId, IActivityClusterGraphGeneratorFactory iActivityClusterGraphGeneratorFactory) throws HyracksException { +// GatherJSONData.fetchJobData(jobId); + } + + @Override + public void notifyJobStart(JobId jobId) throws HyracksException { +// GatherJSONData.fetchJobData(jobId); + } + + @Override + public void notifyJobFinish(JobId jobId) throws HyracksException { +// GatherJSONData gatherJSONData = new GatherJSONData("job", jobId); +// (new Thread(gatherJSONData)).start(); + } +} -- To view, visit https://asterix-gerrit.ics.uci.edu/350 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: newchange Gerrit-Change-Id: I6cb186abda6a14d9eb866259d459ce5b5e855be8 Gerrit-PatchSet: 1 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Pritom Ahmed <[email protected]>
