Hi Erandi,
I spend some time creating a shared LocalClusterUtil class based on
your REST branch. Please include it in your change.
Thanks,
Preston
diff --git a/vxquery-cli/src/main/java/org/apache/vxquery/cli/VXQuery.java
b/vxquery-cli/src/main/java/org/apache/vxquery/cli/VXQuery.java
index 1d4db4f..6237a9f 100644
--- a/vxquery-cli/src/main/java/org/apache/vxquery/cli/VXQuery.java
+++ b/vxquery-cli/src/main/java/org/apache/vxquery/cli/VXQuery.java
@@ -14,6 +14,21 @@
*/
package org.apache.vxquery.cli;
+import static
org.apache.vxquery.core.Constants.HttpHeaderValues.CONTENT_TYPE_JSON;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.logging.LogManager;
+
+import javax.xml.bind.JAXBException;
+
import org.apache.commons.io.FileUtils;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHeaders;
@@ -23,12 +38,9 @@ import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.utils.HttpClientUtils;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
-import org.apache.hyracks.control.cc.ClusterControllerService;
-import org.apache.hyracks.control.common.controllers.CCConfig;
-import org.apache.hyracks.control.common.controllers.NCConfig;
-import org.apache.hyracks.control.nc.NodeControllerService;
-import org.apache.vxquery.app.VXQueryApplication;
+import org.apache.vxquery.app.util.LocalClusterUtil;
import org.apache.vxquery.app.util.RestUtils;
+import org.apache.vxquery.core.VXQueryConfig;
import org.apache.vxquery.rest.request.QueryRequest;
import org.apache.vxquery.rest.request.QueryResultRequest;
import org.apache.vxquery.rest.response.Error;
@@ -39,47 +51,30 @@ import org.kohsuke.args4j.Argument;
import org.kohsuke.args4j.CmdLineParser;
import org.kohsuke.args4j.Option;
-import javax.xml.bind.JAXBException;
-import java.io.File;
-import java.io.IOException;
-import java.io.PrintStream;
-import java.net.InetAddress;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.nio.file.Files;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-import java.util.logging.LogManager;
-
-import static
org.apache.vxquery.core.Constants.HttpHeaderValues.CONTENT_TYPE_JSON;
-
-
public class VXQuery {
private final CmdLineOptions opts;
- private static List<String> timingMessages = new ArrayList<>();
- private static ClusterControllerService clusterControllerService;
- private static NodeControllerService nodeControllerService;
-
private String restIpAddress;
private int restPort;
+ private static LocalClusterUtil vxqueryLocalCluster = new
LocalClusterUtil();
+
/**
* Constructor to use command line options passed.
*
- * @param opts Command line options object
+ * @param opts
+ * Command line options object
*/
- public VXQuery(CmdLineOptions opts) {
+ private VXQuery(CmdLineOptions opts) {
this.opts = opts;
}
/**
* Main method to get command line options and execute query process.
*
- * @param args command line arguments
+ * @param args
+ * command line arguments
*/
public static void main(String[] args) {
LogManager.getLogManager().reset();
@@ -108,8 +103,10 @@ public class VXQuery {
if (opts.restIpAddress == null) {
System.out.println("No REST Ip address given. Creating a local
hyracks cluster");
try {
- restIpAddress = startLocalHyracks();
- restPort = opts.restPort;
+ VXQueryConfig config = loadConfiguration(opts);
+ vxqueryLocalCluster.init(config);
+ restIpAddress = vxqueryLocalCluster.getIpAddress();
+ restPort = vxqueryLocalCluster.getRestPort();
} catch (Exception e) {
System.err.println("Unable to start local hyracks cluster due
to: " + e.getMessage());
return;
@@ -124,20 +121,33 @@ public class VXQuery {
if (opts.restIpAddress == null) {
try {
- stopLocalHyracks();
+ vxqueryLocalCluster.deinit();
} catch (Exception e) {
System.err.println("Error occurred when stopping local
hyracks: " + e.getMessage());
}
}
}
- public void runQueries(List<String> xqFiles) {
+ private VXQueryConfig loadConfiguration(CmdLineOptions opts) {
+ VXQueryConfig vxqConfig = new VXQueryConfig();
+
+ vxqConfig.setAvailableProcessors(opts.availableProcessors);
+ vxqConfig.setJoinHashSize(opts.joinHashSize);
+ vxqConfig.setHdfsConf(opts.hdfsConf);
+ vxqConfig.setMaximumDataSize(opts.maximumDataSize);
+
+ return vxqConfig;
+ }
+
+
+ private void runQueries(List<String> xqFiles) {
for (String xqFile : xqFiles) {
String query;
try {
query = slurp(xqFile);
} catch (IOException e) {
- System.err.println(String.format("Error occurred when reading
XQuery file %s with message: %s", xqFile, e.getMessage()));
+ System.err.println(String.format("Error occurred when reading
XQuery file %s with message: %s", xqFile,
+ e.getMessage()));
continue;
}
@@ -162,8 +172,8 @@ public class VXQuery {
}
if (request.isShowMetrics()) {
- String metrics = String.format("Compile Time:\t%d\nElapsed
Time:\t%d", response.getMetrics().getCompileTime(),
- response.getMetrics().getElapsedTime());
+ String metrics = String.format("Compile Time:\t%d\nElapsed
Time:\t%d",
+ response.getMetrics().getCompileTime(),
response.getMetrics().getElapsedTime());
printField("Query Submission Metrics", metrics);
}
@@ -183,7 +193,7 @@ public class VXQuery {
printField("Runtime Plan", response.getRuntimePlan());
}
-// System.out.println(String.format("Reading results for '%s', result
ID: %d", xqFile, response.getResultId()));
+ // System.out.println(String.format("Reading results for '%s',
result ID: %d", xqFile, response.getResultId()));
QueryResultRequest resultRequest = new
QueryResultRequest(response.getResultId(), response.getRequestId());
resultRequest.setShowMetrics(opts.timing);
sendQueryResultRequest(xqFile, resultRequest, this);
@@ -214,28 +224,29 @@ public class VXQuery {
printField(System.err, String.format("Unable to execute query in
'%s'", xqFile), errorMsg);
}
-
/**
* Submits a query to be executed by the REST API. Will call {@link
#onQueryFailure(String, ErrorResponse)} if any
* error occurs when submitting the query. Else will call {@link
#onQuerySubmitSuccess(String, QueryRequest,
* QueryResponse)} with the {@link QueryResponse}
*
- * @param xqFile .xq file with the query to be executed
- * @param request {@link QueryRequest} instance to be submitted to REST API
- * @param cli cli class instance
+ * @param xqFile
+ * .xq file with the query to be executed
+ * @param request
+ * {@link QueryRequest} instance to be submitted to REST API
+ * @param cli
+ * cli class instance
*/
private static void sendQueryRequest(String xqFile, QueryRequest request,
VXQuery cli) {
URI uri = null;
try {
uri = RestUtils.buildQueryURI(request, cli.restIpAddress,
cli.restPort);
} catch (URISyntaxException e) {
- System.err.println(String.format("Unable to build URI to call REST
API for query: %s", request.getStatement()));
+ System.err.println(
+ String.format("Unable to build URI to call REST API for
query: %s", request.getStatement()));
cli.onQueryFailure(xqFile, null);
}
- CloseableHttpClient httpClient = HttpClients.custom()
- .setConnectionTimeToLive(20,
TimeUnit.SECONDS)
- .build();
+ CloseableHttpClient httpClient =
HttpClients.custom().setConnectionTimeToLive(20, TimeUnit.SECONDS).build();
try {
HttpGet httpGet = new HttpGet(uri);
@@ -246,7 +257,8 @@ public class VXQuery {
String response = RestUtils.readEntity(entity);
if (httpResponse.getStatusLine().getStatusCode() ==
HttpStatus.SC_OK) {
- cli.onQuerySubmitSuccess(xqFile, request,
RestUtils.mapEntity(response, QueryResponse.class, CONTENT_TYPE_JSON));
+ cli.onQuerySubmitSuccess(xqFile, request,
+ RestUtils.mapEntity(response, QueryResponse.class,
CONTENT_TYPE_JSON));
} else {
cli.onQueryFailure(xqFile, RestUtils.mapEntity(response,
ErrorResponse.class, CONTENT_TYPE_JSON));
}
@@ -271,9 +283,7 @@ public class VXQuery {
cli.onQueryFailure(xqFile, null);
}
- CloseableHttpClient httpClient = HttpClients.custom()
- .setConnectionTimeToLive(20,
TimeUnit.SECONDS)
- .build();
+ CloseableHttpClient httpClient =
HttpClients.custom().setConnectionTimeToLive(20, TimeUnit.SECONDS).build();
try {
HttpGet httpGet = new HttpGet(uri);
@@ -284,7 +294,8 @@ public class VXQuery {
String response = RestUtils.readEntity(entity);
if (httpResponse.getStatusLine().getStatusCode() ==
HttpStatus.SC_OK) {
- cli.onQueryResultFetchSuccess(xqFile, request,
RestUtils.mapEntity(response, QueryResultResponse.class, CONTENT_TYPE_JSON));
+ cli.onQueryResultFetchSuccess(xqFile, request,
+ RestUtils.mapEntity(response,
QueryResultResponse.class, CONTENT_TYPE_JSON));
} else {
cli.onQueryFailure(xqFile, RestUtils.mapEntity(response,
ErrorResponse.class, CONTENT_TYPE_JSON));
}
@@ -300,7 +311,6 @@ public class VXQuery {
}
}
-
private static QueryRequest createQueryRequest(CmdLineOptions opts, String
query) {
QueryRequest request = new QueryRequest(query);
request.setCompileOnly(opts.compileOnly);
@@ -317,55 +327,11 @@ public class VXQuery {
}
/**
- * Start local virtual cluster with cluster controller node and node
controller nodes. IP address provided for node
- * controller is localhost. Unassigned ports 39000 and 39001 are used for
client and cluster port respectively.
- * Creates a new Hyracks connection with the IP address and client ports.
- *
- * @throws Exception
- */
- public String startLocalHyracks() throws Exception {
- String localAddress = InetAddress.getLocalHost().getHostAddress();
- CCConfig ccConfig = new CCConfig();
- ccConfig.clientNetIpAddress = localAddress;
- ccConfig.clientNetPort = 39000;
- ccConfig.clusterNetIpAddress = localAddress;
- ccConfig.clusterNetPort = 39001;
- ccConfig.httpPort = 39002;
- ccConfig.profileDumpPeriod = 10000;
- ccConfig.appCCMainClass = VXQueryApplication.class.getName();
- ccConfig.appArgs = Arrays.asList("-restPort",
String.valueOf(opts.restPort));
- clusterControllerService = new ClusterControllerService(ccConfig);
- clusterControllerService.start();
-
- NCConfig ncConfig = new NCConfig();
- ncConfig.ccHost = "localhost";
- ncConfig.ccPort = 39001;
- ncConfig.clusterNetIPAddress = localAddress;
- ncConfig.dataIPAddress = localAddress;
- ncConfig.resultIPAddress = localAddress;
- ncConfig.nodeId = "nc";
- ncConfig.ioDevices =
Files.createTempDirectory(ncConfig.nodeId).toString();
- nodeControllerService = new NodeControllerService(ncConfig);
- nodeControllerService.start();
-
- return localAddress;
- }
-
- /**
- * Shuts down the virtual cluster, along with all nodes and node
execution, network and queue managers.
- *
- * @throws Exception
- */
- public void stopLocalHyracks() throws Exception {
- nodeControllerService.stop();
- clusterControllerService.stop();
- }
-
- /**
* Reads the contents of file given in query into a String. The file is
always closed. For XML files UTF-8 encoding
* is used.
*
- * @param query The query with filename to be processed
+ * @param query
+ * The query with filename to be processed
* @return UTF-8 formatted query string
* @throws IOException
*/
@@ -373,16 +339,6 @@ public class VXQuery {
return FileUtils.readFileToString(new File(query), "UTF-8");
}
- /**
- * Save and print out the timing message.
- *
- * @param message
- */
- private static void timingMessage(String message) {
- System.out.println(message);
- timingMessages.add(message);
- }
-
private static void printField(PrintStream out, String field, String
value) {
out.println();
field = field + ":";
@@ -464,9 +420,6 @@ public class VXQuery {
@Option(name = "-available-processors", usage = "Number of available
processors. (default: java's available processors)")
private int availableProcessors = -1;
- @Option(name = "-local-node-controllers", usage = "Number of local
node controllers. (default: 1)")
- private int localNodeControllers = 1;
-
@Argument
private List<String> xqFiles = new ArrayList<>();
}
diff --git
a/vxquery-rest/src/main/java/org/apache/vxquery/app/util/LocalClusterUtil.java
b/vxquery-rest/src/main/java/org/apache/vxquery/app/util/LocalClusterUtil.java
new file mode 100644
index 0000000..c7e44d2
--- /dev/null
+++
b/vxquery-rest/src/main/java/org/apache/vxquery/app/util/LocalClusterUtil.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.vxquery.app.util;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.nio.file.Files;
+import java.util.Arrays;
+
+import org.apache.hyracks.api.client.HyracksConnection;
+import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.dataset.IHyracksDataset;
+import org.apache.hyracks.client.dataset.HyracksDataset;
+import org.apache.hyracks.control.cc.ClusterControllerService;
+import org.apache.hyracks.control.common.controllers.CCConfig;
+import org.apache.hyracks.control.common.controllers.NCConfig;
+import org.apache.hyracks.control.nc.NodeControllerService;
+import org.apache.vxquery.app.VXQueryApplication;
+import org.apache.vxquery.core.VXQuery;
+import org.apache.vxquery.core.VXQueryConfig;
+
+public class LocalClusterUtil {
+ /*
+ * Start local virtual cluster with cluster controller node and node
controller nodes.
+ * IP address provided for node controller is localhost.
+ * Unassigned ports 39000 and 39001 are used for client and cluster port
respectively.
+ */
+ public static final int DEFAULT_HYRACKS_CC_CLIENT_PORT = 39000;
+ public static final int DEFAULT_HYRACKS_CC_CLUSTER_PORT = 39001;
+ public static final int DEFAULT_HYRACKS_CC_HTTP_PORT = 39002;
+ public static final int DEFAULT_VXQUERY_REST_PORT = 39003;
+
+ // TODO review variable scope after XTest is updated to use the REST
service.
+ public ClusterControllerService clusterControllerService;
+ public NodeControllerService nodeControllerSerivce;
+ public IHyracksClientConnection hcc;
+ public IHyracksDataset hds;
+ public VXQuery restService;
+
+ public void init(VXQueryConfig config) throws Exception {
+ // Cluster controller
+ CCConfig ccConfig = createCCConfig();
+ clusterControllerService = new ClusterControllerService(ccConfig);
+ clusterControllerService.start();
+
+ hcc = new HyracksConnection(ccConfig.clientNetIpAddress,
ccConfig.clientNetPort);
+ hds = new HyracksDataset(hcc, config.getFrameSize(),
config.getAvailableProcessors());
+
+ // Node controller
+ NCConfig ncConfig = createNCConfig();
+ nodeControllerSerivce = new NodeControllerService(ncConfig);
+ nodeControllerSerivce.start();
+
+ hcc = new HyracksConnection(ccConfig.clientNetIpAddress,
ccConfig.clientNetPort);
+
+ // REST controller
+ config.setHyracksClientIp(ccConfig.clientNetIpAddress);
+ config.setHyracksClientPort(ccConfig.clientNetPort);
+ restService = new VXQuery(config);
+ restService.start();
+ }
+
+ protected CCConfig createCCConfig() throws IOException {
+ String localAddress = getIpAddress();
+ CCConfig ccConfig = new CCConfig();
+ ccConfig.clientNetIpAddress = localAddress;
+ ccConfig.clientNetPort = DEFAULT_HYRACKS_CC_CLIENT_PORT;
+ ccConfig.clusterNetIpAddress = localAddress;
+ ccConfig.clusterNetPort = DEFAULT_HYRACKS_CC_CLUSTER_PORT;
+ ccConfig.httpPort = DEFAULT_HYRACKS_CC_HTTP_PORT;
+ ccConfig.profileDumpPeriod = 10000;
+ ccConfig.appCCMainClass = VXQueryApplication.class.getName();
+ ccConfig.appArgs = Arrays.asList("-restPort",
String.valueOf(DEFAULT_VXQUERY_REST_PORT));
+ return ccConfig;
+ }
+
+ protected NCConfig createNCConfig() throws IOException {
+ String localAddress = getIpAddress();
+ NCConfig ncConfig = new NCConfig();
+ ncConfig.ccHost = "localhost";
+ ncConfig.ccPort = DEFAULT_HYRACKS_CC_CLUSTER_PORT;
+ ncConfig.clusterNetIPAddress = localAddress;
+ ncConfig.dataIPAddress = localAddress;
+ ncConfig.resultIPAddress = localAddress;
+ ncConfig.nodeId = "test_node";
+ //TODO: enable index folder as a cli option for on-the-fly indexing
queries
+ ncConfig.ioDevices =
Files.createTempDirectory(ncConfig.nodeId).toString();
+ return ncConfig;
+ }
+
+ public IHyracksClientConnection getHyracksClientConnection() {
+ return hcc;
+ }
+
+ public VXQuery getRestService() {
+ return restService;
+ }
+
+ public void deinit() throws Exception {
+ restService.stop();
+ nodeControllerSerivce.stop();
+ clusterControllerService.stop();
+ }
+
+ public static void main(String[] args) {
+ LocalClusterUtil localClusterUtil = new LocalClusterUtil();
+ VXQueryConfig config = new VXQueryConfig();
+ run(localClusterUtil, config);
+ }
+
+ protected static void run(final LocalClusterUtil localClusterUtil,
VXQueryConfig config) {
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
+ public void run() {
+ try {
+ localClusterUtil.deinit();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ });
+ try {
+ localClusterUtil.init(config);
+ while (true) {
+ Thread.sleep(10000);
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ System.exit(1);
+ }
+ }
+
+ public String getIpAddress() throws UnknownHostException {
+ return InetAddress.getLocalHost().getHostAddress();
+ }
+
+ public int getRestPort() {
+ return DEFAULT_VXQUERY_REST_PORT;
+ }
+
+ @Deprecated
+ public IHyracksClientConnection getConnection() {
+ return hcc;
+ }
+
+ @Deprecated
+ public IHyracksDataset getDataset() {
+ return hds;
+ }
+
+}
\ No newline at end of file
diff --git a/vxquery-rest/src/main/java/org/apache/vxquery/core/VXQuery.java
b/vxquery-rest/src/main/java/org/apache/vxquery/core/VXQuery.java
index ff06dc6..56d45fe 100644
--- a/vxquery-rest/src/main/java/org/apache/vxquery/core/VXQuery.java
+++ b/vxquery-rest/src/main/java/org/apache/vxquery/core/VXQuery.java
@@ -16,8 +16,23 @@
*/
package org.apache.vxquery.core;
-import com.thoughtworks.xstream.XStream;
-import com.thoughtworks.xstream.io.xml.DomDriver;
+import static java.util.logging.Level.SEVERE;
+import static org.apache.vxquery.core.Constants.ErrorCodes.NOT_FOUND;
+import static org.apache.vxquery.core.Constants.ErrorCodes.PROBLEM_WITH_QUERY;
+import static org.apache.vxquery.core.Constants.ErrorCodes.UNFORSEEN_PROBLEM;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.PrintWriter;
+import java.io.StringReader;
+import java.util.Date;
+import java.util.EnumSet;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
import org.apache.commons.io.output.ByteArrayOutputStream;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import
org.apache.hyracks.algebricks.core.algebra.prettyprint.AlgebricksAppendable;
@@ -60,22 +75,8 @@ import org.apache.vxquery.xmlquery.query.Module;
import org.apache.vxquery.xmlquery.query.XMLQueryCompiler;
import org.apache.vxquery.xmlquery.query.XQueryCompilationListener;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.PrintWriter;
-import java.io.StringReader;
-import java.util.Date;
-import java.util.EnumSet;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import static java.util.logging.Level.SEVERE;
-import static org.apache.vxquery.core.Constants.ErrorCodes.NOT_FOUND;
-import static org.apache.vxquery.core.Constants.ErrorCodes.PROBLEM_WITH_QUERY;
-import static org.apache.vxquery.core.Constants.ErrorCodes.UNFORSEEN_PROBLEM;
+import com.thoughtworks.xstream.XStream;
+import com.thoughtworks.xstream.io.xml.DomDriver;
/**
* Main class responsible for handling query requests. This class will first
compile, then submit query to hyracks and
@@ -116,13 +117,16 @@ public class VXQuery {
setState(State.STARTING);
try {
- hyracksClientConnection = new
HyracksConnection(vxQueryConfig.getHyracksClientIp(),
vxQueryConfig.getHyracksClientPort());
+ hyracksClientConnection = new
HyracksConnection(vxQueryConfig.getHyracksClientIp(),
+ vxQueryConfig.getHyracksClientPort());
} catch (Exception e) {
- LOGGER.log(SEVERE, String.format("Unable to create a hyracks
client connection to %s:%d", vxQueryConfig.getHyracksClientIp(),
vxQueryConfig.getHyracksClientPort()));
+ LOGGER.log(SEVERE, String.format("Unable to create a hyracks
client connection to %s:%d",
+ vxQueryConfig.getHyracksClientIp(),
vxQueryConfig.getHyracksClientPort()));
throw new VXQueryRuntimeException("Unable to create a hyracks
client connection", e);
}
- LOGGER.log(Level.FINE, String.format("Using hyracks connection to
%s:%d", vxQueryConfig.getHyracksClientIp(),
vxQueryConfig.getHyracksClientPort()));
+ LOGGER.log(Level.FINE, String.format("Using hyracks connection to
%s:%d", vxQueryConfig.getHyracksClientIp(),
+ vxQueryConfig.getHyracksClientPort()));
setState(State.STARTED);
LOGGER.log(Level.INFO, "VXQuery started successfully");
@@ -136,8 +140,9 @@ public class VXQuery {
* Submits a query to hyracks to be run after compiling. Required
intermediate results and metrics are also
* calculated according to the {@link QueryRequest}. Checks if this class
has started before moving further.
*
- * @param request {@link QueryRequest} containing information about the
query to be executed and the merics required
- * along with the results
+ * @param request
+ * {@link QueryRequest} containing information about the query
to be executed and the merics required
+ * along with the results
* @return QueryResponse if no error occurs | ErrorResponse else
*/
public APIResponse execute(final QueryRequest request) {
@@ -161,38 +166,33 @@ public class VXQuery {
} catch (HyracksException e) {
LOGGER.log(Level.SEVERE, String.format("Error occurred when
obtaining NC info: '%s'", e.getMessage()));
return APIResponse.newErrorResponse(request.getRequestId(),
Error.builder().withCode(UNFORSEEN_PROBLEM)
-
.withMessage("Hyracks connection problem: " + e.getMessage())
-
.build());
+ .withMessage("Hyracks connection problem: " +
e.getMessage()).build());
}
// Adding a query compilation listener
- VXQueryCompilationListener listener = new
VXQueryCompilationListener(response, request.isShowAbstractSyntaxTree(),
-
request.isShowTranslatedExpressionTree(),
-
request.isShowOptimizedExpressionTree(),
-
request.isShowRuntimePlan());
+ VXQueryCompilationListener listener = new
VXQueryCompilationListener(response,
+ request.isShowAbstractSyntaxTree(),
request.isShowTranslatedExpressionTree(),
+ request.isShowOptimizedExpressionTree(),
request.isShowRuntimePlan());
Date start = new Date();
// Compiling the XQuery given
- final XMLQueryCompiler compiler = new XMLQueryCompiler(listener,
nodeControllerInfos,
-
request.getFrameSize(),
-
vxQueryConfig.getAvailableProcessors(),
-
vxQueryConfig.getJoinHashSize(),
-
vxQueryConfig.getMaximumDataSize(),
-
vxQueryConfig.getHdfsConf());
- CompilerControlBlock compilerControlBlock = new
CompilerControlBlock(new StaticContextImpl(RootStaticContextImpl.INSTANCE),
-
resultSetId, null);
+ final XMLQueryCompiler compiler = new XMLQueryCompiler(listener,
nodeControllerInfos, request.getFrameSize(),
+ vxQueryConfig.getAvailableProcessors(),
vxQueryConfig.getJoinHashSize(),
+ vxQueryConfig.getMaximumDataSize(),
vxQueryConfig.getHdfsConf());
+ CompilerControlBlock compilerControlBlock = new CompilerControlBlock(
+ new StaticContextImpl(RootStaticContextImpl.INSTANCE),
resultSetId, null);
try {
compiler.compile(null, new StringReader(query),
compilerControlBlock, request.getOptimization(), null);
} catch (AlgebricksException e) {
- LOGGER.log(Level.SEVERE, String.format("Error occurred when
compiling query: '%s' with message: '%s'", query, e.getMessage()));
+ LOGGER.log(Level.SEVERE, String.format("Error occurred when
compiling query: '%s' with message: '%s'",
+ query, e.getMessage()));
return APIResponse.newErrorResponse(request.getRequestId(),
Error.builder().withCode(PROBLEM_WITH_QUERY)
-
.withMessage("Query compilation failure: " + e.getMessage())
-
.build());
+ .withMessage("Query compilation failure: " +
e.getMessage()).build());
} catch (SystemException e) {
- LOGGER.log(Level.SEVERE, String.format("Error occurred when
compiling query: '%s' with message: '%s'", query, e.getMessage()));
+ LOGGER.log(Level.SEVERE, String.format("Error occurred when
compiling query: '%s' with message: '%s'",
+ query, e.getMessage()));
return APIResponse.newErrorResponse(request.getRequestId(),
Error.builder().withCode(PROBLEM_WITH_QUERY)
-
.withMessage("Query compilation failure: " + e.getMessage())
-
.build());
+ .withMessage("Query compilation failure: " +
e.getMessage()).build());
}
if (request.isShowMetrics()) {
@@ -212,8 +212,7 @@ public class VXQuery {
} catch (Exception e) {
LOGGER.log(SEVERE, "Error occurred when submitting job to
hyracks for query: " + query, e);
return APIResponse.newErrorResponse(request.getRequestId(),
Error.builder().withCode(UNFORSEEN_PROBLEM)
-
.withMessage("Error occurred when starting hyracks job")
-
.build());
+ .withMessage("Error occurred when starting hyracks
job").build());
}
if (request.isShowMetrics()) {
@@ -228,9 +227,10 @@ public class VXQuery {
/**
* Returns the query results for a given result set id.
*
- * @param request {@link QueryResultRequest} with result ID required
+ * @param request
+ * {@link QueryResultRequest} with result ID required
* @return Either a {@link QueryResultResponse} if no error occurred |
{@link org.apache.vxquery.rest.response.ErrorResponse}
- * else.
+ * else.
*/
public APIResponse getResult(QueryResultRequest request) {
if (jobContexts.containsKey(request.getResultId())) {
@@ -242,7 +242,8 @@ public class VXQuery {
LOGGER.log(Level.SEVERE, "Error occurred when reading results
for id : " + request.getResultId());
return APIResponse.newErrorResponse(request.getRequestId(),
Error.builder().withCode(UNFORSEEN_PROBLEM)
- .withMessage("Error occurred when reading
results from hyracks for result ID: " + request.getResultId())
+ .withMessage("Error occurred when reading
results from hyracks for result ID: "
+ + request.getResultId())
.build());
}
@@ -252,9 +253,8 @@ public class VXQuery {
return resultResponse;
} else {
- return APIResponse.newErrorResponse(request.getRequestId(),
- Error.builder().withCode(NOT_FOUND)
- .withMessage("No query found for result ID : " +
request.getResultId()).build());
+ return APIResponse.newErrorResponse(request.getRequestId(),
Error.builder().withCode(NOT_FOUND)
+ .withMessage("No query found for result ID : " +
request.getResultId()).build());
}
}
@@ -262,9 +262,12 @@ public class VXQuery {
* Reads results from hyracks given the {@link HyracksJobContext}
containing {@link ResultSetId} and {@link JobId}
* mapping.
*
- * @param jobContext mapoing between the {@link ResultSetId} and
corresponding hyracks {@link JobId}
- * @param resultResponse {@link QueryResultResponse} object to which the
result will be added.
- * @throws Exception IOErrors and etc
+ * @param jobContext
+ * mapoing between the {@link ResultSetId} and corresponding
hyracks {@link JobId}
+ * @param resultResponse
+ * {@link QueryResultResponse} object to which the result will
be added.
+ * @throws Exception
+ * IOErrors and etc
*/
private void readResults(HyracksJobContext jobContext, QueryResultResponse
resultResponse) throws Exception {
int nReaders = 1;
@@ -331,11 +334,8 @@ public class VXQuery {
private boolean showOptimizedExpressionTree;
private boolean showRuntimePlan;
- public VXQueryCompilationListener(QueryResponse response,
- boolean showAbstractSyntaxTree,
- boolean showTranslatedExpressionTree,
- boolean showOptimizedExpressionTree,
- boolean showRuntimePlan) {
+ public VXQueryCompilationListener(QueryResponse response, boolean
showAbstractSyntaxTree,
+ boolean showTranslatedExpressionTree, boolean
showOptimizedExpressionTree, boolean showRuntimePlan) {
this.response = response;
this.showAbstractSyntaxTree = showAbstractSyntaxTree;
this.showTranslatedExpressionTree = showTranslatedExpressionTree;
@@ -368,7 +368,9 @@ public class VXQuery {
try {
response.setRuntimePlan(jobSpec.toJSON().toString());
} catch (IOException e) {
- LOGGER.log(SEVERE, "Error occurred when obtaining runtime
plan from job specification : " + jobSpec.toString(), e);
+ LOGGER.log(SEVERE,
+ "Error occurred when obtaining runtime plan from
job specification : " + jobSpec.toString(),
+ e);
}
}
}
@@ -384,7 +386,7 @@ public class VXQuery {
private StringBuilder appendPrettyPlan(StringBuilder sb, Module
module) {
try {
ILogicalExpressionVisitor<String, Integer> ev = new
VXQueryLogicalExpressionPrettyPrintVisitor(
-
module.getModuleContext());
+ module.getModuleContext());
AlgebricksAppendable buffer = new AlgebricksAppendable();
LogicalOperatorPrettyPrintVisitor v = new
LogicalOperatorPrettyPrintVisitor(buffer, ev);
PlanPrettyPrinter.printPlan(module.getBody(), v, 0);
diff --git
a/vxquery-rest/src/main/java/org/apache/vxquery/core/VXQueryConfig.java
b/vxquery-rest/src/main/java/org/apache/vxquery/core/VXQueryConfig.java
index e2ada77..e8c250b 100644
--- a/vxquery-rest/src/main/java/org/apache/vxquery/core/VXQueryConfig.java
+++ b/vxquery-rest/src/main/java/org/apache/vxquery/core/VXQueryConfig.java
@@ -17,9 +17,6 @@
package org.apache.vxquery.core;
-import java.util.HashMap;
-import java.util.Map;
-
/**
* A class to store default/user specified configurations required at runtime
by the {@link VXQuery} class. These
* configuration will be loaded through a properties file.
@@ -30,6 +27,8 @@ public class VXQueryConfig {
/** Number of available processors. (default: java's available processors)
*/
private int availableProcessors = -1;
+ /** Setting frame size. (default: 65,536) */
+ private int frameSize = 65536;
/** Join hash size in bytes. (default: 67,108,864) */
private long joinHashSize = -1;
/** Maximum possible data size in bytes. (default: 150,323,855,000) */
@@ -87,4 +86,12 @@ public class VXQueryConfig {
public void setHyracksClientIp(String hyracksClientIp) {
this.hyracksClientIp = hyracksClientIp;
}
+
+ public int getFrameSize() {
+ return frameSize;
+ }
+
+ public void setFrameSize(int frameSize) {
+ this.frameSize = frameSize;
+ }
}
diff --git
a/vxquery-rest/src/test/java/org/apache/vxquery/rest/AbstractRestServerTest.java
b/vxquery-rest/src/test/java/org/apache/vxquery/rest/AbstractRestServerTest.java
index bf2cb75..f906358 100644
---
a/vxquery-rest/src/test/java/org/apache/vxquery/rest/AbstractRestServerTest.java
+++
b/vxquery-rest/src/test/java/org/apache/vxquery/rest/AbstractRestServerTest.java
@@ -17,20 +17,12 @@
package org.apache.vxquery.rest;
-import org.apache.hyracks.control.cc.ClusterControllerService;
-import org.apache.hyracks.control.common.controllers.CCConfig;
-import org.apache.hyracks.control.common.controllers.NCConfig;
-import org.apache.hyracks.control.nc.NodeControllerService;
import org.apache.vxquery.app.VXQueryApplication;
+import org.apache.vxquery.app.util.LocalClusterUtil;
import org.apache.vxquery.core.VXQuery;
-import org.apache.vxquery.core.VXQueryConfig;
import org.junit.AfterClass;
import org.junit.BeforeClass;
-import java.net.InetAddress;
-import java.nio.file.Files;
-import java.util.Arrays;
-
/**
* Abstract test class to be used for {@link VXQueryApplication} related
tests. These tests are expected to use the REST
* API for querying and fetching results
@@ -39,71 +31,21 @@ import java.util.Arrays;
*/
public class AbstractRestServerTest {
- protected static final int restPort = 8085;
-
- private static ClusterControllerService clusterControllerService;
- private static NodeControllerService nodeControllerService;
-
+ private static LocalClusterUtil vxqueryLocalCluster = new
LocalClusterUtil();
protected static VXQuery vxQuery;
+ protected static String restIpAddress;
+ protected static int restPort;
@BeforeClass
public static void setUp() throws Exception {
- startLocalHyracks();
-
- CCConfig ccConfig = clusterControllerService.getCCConfig();
- VXQueryConfig config = new VXQueryConfig();
- config.setHyracksClientIp(ccConfig.clientNetIpAddress);
- config.setHyracksClientPort(ccConfig.clientNetPort);
- vxQuery = new VXQuery(config);
- vxQuery.start();
- }
-
- /**
- * Start local virtual cluster with cluster controller node and node
controller nodes. IP address provided for node
- * controller is localhost. Unassigned ports 39000 and 39001 are used for
client and cluster port respectively.
- *
- * @throws Exception
- */
- private static void startLocalHyracks() throws Exception {
- String localAddress = InetAddress.getLocalHost().getHostAddress();
- CCConfig ccConfig = new CCConfig();
- ccConfig.clientNetIpAddress = localAddress;
- ccConfig.clientNetPort = 39000;
- ccConfig.clusterNetIpAddress = localAddress;
- ccConfig.clusterNetPort = 39001;
- ccConfig.httpPort = 39002;
- ccConfig.profileDumpPeriod = 10000;
- ccConfig.appCCMainClass = VXQueryApplication.class.getName();
- ccConfig.appArgs = Arrays.asList("-restPort",
String.valueOf(restPort));
- clusterControllerService = new ClusterControllerService(ccConfig);
- clusterControllerService.start();
-
- NCConfig ncConfig = new NCConfig();
- ncConfig.ccHost = "localhost";
- ncConfig.ccPort = 39001;
- ncConfig.clusterNetIPAddress = localAddress;
- ncConfig.dataIPAddress = localAddress;
- ncConfig.resultIPAddress = localAddress;
- ncConfig.nodeId = "nc";
- ncConfig.ioDevices =
Files.createTempDirectory(ncConfig.nodeId).toString();
- nodeControllerService = new NodeControllerService(ncConfig);
- nodeControllerService.start();
- }
-
-
- /**
- * Shuts down the virtual cluster, along with all nodes and node
execution, network and queue managers.
- *
- * @throws Exception
- */
- private static void stopLocalHyracks() throws Exception {
- nodeControllerService.stop();
- clusterControllerService.stop();
+ vxqueryLocalCluster.init();
+ vxQuery = vxqueryLocalCluster.getRestService();
+ restIpAddress = vxqueryLocalCluster.getIpAddress();
+ restPort = vxqueryLocalCluster.getRestPort();
}
@AfterClass
public static void tearDown() throws Exception {
- vxQuery.stop();
- stopLocalHyracks();
+ vxqueryLocalCluster.deinit();
}
}
diff --git
a/vxquery-rest/src/test/java/org/apache/vxquery/rest/ErrorResponseTest.java
b/vxquery-rest/src/test/java/org/apache/vxquery/rest/ErrorResponseTest.java
index f683609..99e9b9d 100644
--- a/vxquery-rest/src/test/java/org/apache/vxquery/rest/ErrorResponseTest.java
+++ b/vxquery-rest/src/test/java/org/apache/vxquery/rest/ErrorResponseTest.java
@@ -17,6 +17,17 @@
package org.apache.vxquery.rest;
+import static org.apache.vxquery.app.util.RestUtils.buildQueryResultURI;
+import static org.apache.vxquery.app.util.RestUtils.buildQueryURI;
+import static org.apache.vxquery.core.Constants.ErrorCodes.INVALID_INPUT;
+import static org.apache.vxquery.core.Constants.ErrorCodes.NOT_FOUND;
+import static org.apache.vxquery.core.Constants.ErrorCodes.PROBLEM_WITH_QUERY;
+import static
org.apache.vxquery.core.Constants.HttpHeaderValues.CONTENT_TYPE_JSON;
+import static
org.apache.vxquery.core.Constants.HttpHeaderValues.CONTENT_TYPE_XML;
+
+import java.net.URI;
+import java.util.concurrent.TimeUnit;
+
import org.apache.http.HttpEntity;
import org.apache.http.HttpHeaders;
import org.apache.http.client.methods.CloseableHttpResponse;
@@ -31,17 +42,6 @@ import org.apache.vxquery.rest.response.ErrorResponse;
import org.junit.Assert;
import org.junit.Test;
-import java.net.URI;
-import java.util.concurrent.TimeUnit;
-
-import static org.apache.vxquery.app.util.RestUtils.buildQueryResultURI;
-import static org.apache.vxquery.app.util.RestUtils.buildQueryURI;
-import static org.apache.vxquery.core.Constants.ErrorCodes.INVALID_INPUT;
-import static org.apache.vxquery.core.Constants.ErrorCodes.NOT_FOUND;
-import static org.apache.vxquery.core.Constants.ErrorCodes.PROBLEM_WITH_QUERY;
-import static
org.apache.vxquery.core.Constants.HttpHeaderValues.CONTENT_TYPE_JSON;
-import static
org.apache.vxquery.core.Constants.HttpHeaderValues.CONTENT_TYPE_XML;
-
/**
* Tests error codes of the possible error responses that can be received for
erroneous queries.
*
@@ -49,61 +49,57 @@ import static
org.apache.vxquery.core.Constants.HttpHeaderValues.CONTENT_TYPE_XM
*/
public class ErrorResponseTest extends AbstractRestServerTest {
- private static final String HOST = "localhost";
-
@Test
public void testInvalidInput01() throws Exception {
QueryRequest request = new QueryRequest(" ");
- runTest(buildQueryURI(request, HOST, restPort), CONTENT_TYPE_JSON,
INVALID_INPUT);
- runTest(buildQueryURI(request, HOST, restPort), CONTENT_TYPE_XML,
INVALID_INPUT);
+ runTest(buildQueryURI(request, restIpAddress, restPort),
CONTENT_TYPE_JSON, INVALID_INPUT);
+ runTest(buildQueryURI(request, restIpAddress, restPort),
CONTENT_TYPE_XML, INVALID_INPUT);
}
@Test
public void testInvalidInput02() throws Exception {
QueryRequest request = new QueryRequest("");
- runTest(buildQueryURI(request, HOST, restPort), CONTENT_TYPE_JSON,
405);
- runTest(buildQueryURI(request, HOST, restPort), CONTENT_TYPE_XML, 405);
+ runTest(buildQueryURI(request, restIpAddress, restPort),
CONTENT_TYPE_JSON, 405);
+ runTest(buildQueryURI(request, restIpAddress, restPort),
CONTENT_TYPE_XML, 405);
}
@Test
public void testInvalidQuery01() throws Exception {
QueryRequest request = new QueryRequest("for $x in (1,2,3) return $y");
- runTest(buildQueryURI(request, HOST, restPort), CONTENT_TYPE_JSON,
PROBLEM_WITH_QUERY);
- runTest(buildQueryURI(request, HOST, restPort), CONTENT_TYPE_XML,
PROBLEM_WITH_QUERY);
+ runTest(buildQueryURI(request, restIpAddress, restPort),
CONTENT_TYPE_JSON, PROBLEM_WITH_QUERY);
+ runTest(buildQueryURI(request, restIpAddress, restPort),
CONTENT_TYPE_XML, PROBLEM_WITH_QUERY);
}
@Test
public void testInvalidQuery02() throws Exception {
QueryRequest request = new QueryRequest("for x in (1,2,3) return $x");
- runTest(buildQueryURI(request, HOST, restPort), CONTENT_TYPE_JSON,
PROBLEM_WITH_QUERY);
- runTest(buildQueryURI(request, HOST, restPort), CONTENT_TYPE_XML,
PROBLEM_WITH_QUERY);
+ runTest(buildQueryURI(request, restIpAddress, restPort),
CONTENT_TYPE_JSON, PROBLEM_WITH_QUERY);
+ runTest(buildQueryURI(request, restIpAddress, restPort),
CONTENT_TYPE_XML, PROBLEM_WITH_QUERY);
}
@Test
public void testInvalidQuery03() throws Exception {
QueryRequest request = new QueryRequest("insert nodes <book></book>
into doc(\"abcd.xml\")/books");
- runTest(buildQueryURI(request, HOST, restPort), CONTENT_TYPE_JSON,
PROBLEM_WITH_QUERY);
- runTest(buildQueryURI(request, HOST, restPort), CONTENT_TYPE_XML,
PROBLEM_WITH_QUERY);
+ runTest(buildQueryURI(request, restIpAddress, restPort),
CONTENT_TYPE_JSON, PROBLEM_WITH_QUERY);
+ runTest(buildQueryURI(request, restIpAddress, restPort),
CONTENT_TYPE_XML, PROBLEM_WITH_QUERY);
}
@Test
public void testInvalidQuery04() throws Exception {
QueryRequest request = new QueryRequest("delete nodes /a/b//node()");
- runTest(buildQueryURI(request, HOST, restPort), CONTENT_TYPE_JSON,
PROBLEM_WITH_QUERY);
- runTest(buildQueryURI(request, HOST, restPort), CONTENT_TYPE_XML,
PROBLEM_WITH_QUERY);
+ runTest(buildQueryURI(request, restIpAddress, restPort),
CONTENT_TYPE_JSON, PROBLEM_WITH_QUERY);
+ runTest(buildQueryURI(request, restIpAddress, restPort),
CONTENT_TYPE_XML, PROBLEM_WITH_QUERY);
}
@Test
public void testInvalidResultId() throws Exception {
QueryResultRequest request = new QueryResultRequest(1000);
- runTest(buildQueryResultURI(request, HOST, restPort),
CONTENT_TYPE_JSON, NOT_FOUND);
- runTest(buildQueryResultURI(request, HOST, restPort),
CONTENT_TYPE_XML, NOT_FOUND);
+ runTest(buildQueryResultURI(request, restIpAddress, restPort),
CONTENT_TYPE_JSON, NOT_FOUND);
+ runTest(buildQueryResultURI(request, restIpAddress, restPort),
CONTENT_TYPE_XML, NOT_FOUND);
}
private void runTest(URI uri, String accepts, int expectedStatusCode)
throws Exception {
- CloseableHttpClient httpClient = HttpClients.custom()
- .setConnectionTimeToLive(20,
TimeUnit.SECONDS)
- .build();
+ CloseableHttpClient httpClient =
HttpClients.custom().setConnectionTimeToLive(20, TimeUnit.SECONDS).build();
ErrorResponse errorResponse;
try {
diff --git
a/vxquery-rest/src/test/java/org/apache/vxquery/rest/SuccessResponseTest.java
b/vxquery-rest/src/test/java/org/apache/vxquery/rest/SuccessResponseTest.java
index 70af7a6..f4a40b3 100644
---
a/vxquery-rest/src/test/java/org/apache/vxquery/rest/SuccessResponseTest.java
+++
b/vxquery-rest/src/test/java/org/apache/vxquery/rest/SuccessResponseTest.java
@@ -17,7 +17,12 @@
package org.apache.vxquery.rest;
-import io.netty.handler.codec.http.HttpResponseStatus;
+import static
org.apache.vxquery.core.Constants.HttpHeaderValues.CONTENT_TYPE_JSON;
+import static
org.apache.vxquery.core.Constants.HttpHeaderValues.CONTENT_TYPE_XML;
+
+import java.net.URI;
+import java.util.concurrent.TimeUnit;
+
import org.apache.http.HttpEntity;
import org.apache.http.HttpHeaders;
import org.apache.http.client.methods.CloseableHttpResponse;
@@ -35,11 +40,7 @@ import org.apache.vxquery.rest.response.QueryResultResponse;
import org.junit.Assert;
import org.junit.Test;
-import java.net.URI;
-import java.util.concurrent.TimeUnit;
-
-import static
org.apache.vxquery.core.Constants.HttpHeaderValues.CONTENT_TYPE_JSON;
-import static
org.apache.vxquery.core.Constants.HttpHeaderValues.CONTENT_TYPE_XML;
+import io.netty.handler.codec.http.HttpResponseStatus;
/**
* This class tests the success responses received for XQueries submitted. i.e
we are submitting correct queries which
@@ -77,7 +78,7 @@ public class SuccessResponseTest extends
AbstractRestServerTest {
}
private void runTest(String contentType, QueryRequest request) throws
Exception {
- URI queryEndpointUri = RestUtils.buildQueryURI(request, "localhost",
restPort);
+ URI queryEndpointUri = RestUtils.buildQueryURI(request, restIpAddress,
restPort);
/*
* ========== Query Response Testing ==========
@@ -217,7 +218,7 @@ public class SuccessResponseTest extends
AbstractRestServerTest {
* @throws Exception
*/
private static QueryResultResponse
getQueryResultResponse(QueryResultRequest resultRequest, String accepts) throws
Exception {
- URI queryResultEndpointUri =
RestUtils.buildQueryResultURI(resultRequest, "localhost", restPort);
+ URI queryResultEndpointUri =
RestUtils.buildQueryResultURI(resultRequest, restIpAddress, restPort);
CloseableHttpClient httpClient = HttpClients.custom()
.setConnectionTimeToLive(20,
TimeUnit.SECONDS)
diff --git a/vxquery-xtest/pom.xml b/vxquery-xtest/pom.xml
index a00bec2..a32d913 100644
--- a/vxquery-xtest/pom.xml
+++ b/vxquery-xtest/pom.xml
@@ -144,6 +144,12 @@
</dependency>
<dependency>
+ <groupId>org.apache.vxquery</groupId>
+ <artifactId>apache-vxquery-rest</artifactId>
+ <version>0.7-SNAPSHOT</version>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.hyracks</groupId>
<artifactId>hyracks-api</artifactId>
</dependency>
diff --git
a/vxquery-xtest/src/main/java/org/apache/vxquery/xtest/TestClusterUtil.java
b/vxquery-xtest/src/main/java/org/apache/vxquery/xtest/TestClusterUtil.java
index 0e5b481..e088b0d 100644
--- a/vxquery-xtest/src/main/java/org/apache/vxquery/xtest/TestClusterUtil.java
+++ b/vxquery-xtest/src/main/java/org/apache/vxquery/xtest/TestClusterUtil.java
@@ -17,83 +17,47 @@
package org.apache.vxquery.xtest;
+import java.io.IOException;
+
import org.apache.hyracks.api.client.HyracksConnection;
import org.apache.hyracks.client.dataset.HyracksDataset;
-import org.apache.hyracks.control.cc.ClusterControllerService;
-import org.apache.hyracks.control.common.controllers.CCConfig;
-import org.apache.hyracks.control.common.controllers.NCConfig;
-import org.apache.hyracks.control.nc.NodeControllerService;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
+import org.apache.vxquery.app.util.LocalClusterUtil;
+import org.apache.vxquery.core.VXQueryConfig;
public class TestClusterUtil {
- private static final int CLIENT_NET_PORT = 39000;
- private static final int CLUSTER_NET_PORT = 39001;
- private static final int PROFILE_DUMP_PERIOD = 10000;
- private static final String CC_HOST = "localhost";
- private static final String NODE_ID = "nc1";
- private static final String IO_DEVICES = "target/tmp/indexFolder";
-
private static HyracksConnection hcc;
private static HyracksDataset hds;
+ public static final LocalClusterUtil localClusterUtil = new
LocalClusterUtil();
+
private TestClusterUtil() {
}
- public static CCConfig createCCConfig() throws UnknownHostException {
- String publicAddress = InetAddress.getLocalHost().getHostAddress();
- CCConfig ccConfig = new CCConfig();
- ccConfig.clientNetIpAddress = publicAddress;
- ccConfig.clientNetPort = CLIENT_NET_PORT;
- ccConfig.clusterNetIpAddress = publicAddress;
- ccConfig.clusterNetPort = CLUSTER_NET_PORT;
- ccConfig.profileDumpPeriod = PROFILE_DUMP_PERIOD;
- return ccConfig;
- }
+ private static VXQueryConfig loadConfiguration(XTestOptions opts) {
+ VXQueryConfig vxqConfig = new VXQueryConfig();
+
+ vxqConfig.setAvailableProcessors(opts.threads);
+ vxqConfig.setFrameSize(opts.frameSize);
+ vxqConfig.setHdfsConf(opts.hdfsConf);
- public static NCConfig createNCConfig() throws UnknownHostException {
- String publicAddress = InetAddress.getLocalHost().getHostAddress();
- NCConfig ncConfig1 = new NCConfig();
- ncConfig1.ccHost = CC_HOST;
- ncConfig1.ccPort = CLUSTER_NET_PORT;
- ncConfig1.clusterNetIPAddress = publicAddress;
- ncConfig1.dataIPAddress = publicAddress;
- ncConfig1.resultIPAddress = publicAddress;
- ncConfig1.nodeId = NODE_ID;
- ncConfig1.ioDevices = IO_DEVICES;
- return ncConfig1;
+ return vxqConfig;
}
- public static ClusterControllerService startCC(XTestOptions opts) throws
IOException {
- CCConfig ccConfig = createCCConfig();
- File outDir = new File("target/ClusterController");
- outDir.mkdirs();
- File ccRoot = File.createTempFile(TestRunner.class.getName(), ".data",
outDir);
- ccRoot.delete();
- ccRoot.mkdir();
- ccConfig.ccRoot = ccRoot.getAbsolutePath();
+ public static void startCluster(XTestOptions opts, LocalClusterUtil
localClusterUtil) throws IOException {
try {
- ClusterControllerService cc = new
ClusterControllerService(ccConfig);
- cc.start();
- hcc = new HyracksConnection(ccConfig.clientNetIpAddress,
ccConfig.clientNetPort);
- hds = new HyracksDataset(hcc, opts.frameSize, opts.threads);
- return cc;
+ VXQueryConfig config = loadConfiguration(opts);
+ localClusterUtil.init(config);
+ hcc = (HyracksConnection) localClusterUtil.getConnection();
+ hds = (HyracksDataset) localClusterUtil.getDataset();
} catch (Exception e) {
throw new IOException(e);
}
-
}
- public static NodeControllerService startNC() throws IOException {
- NCConfig ncConfig = createNCConfig();
+ public static void stopCluster(LocalClusterUtil localClusterUtil) throws
IOException {
try {
- NodeControllerService nc = new NodeControllerService(ncConfig);
- nc.start();
- return nc;
+ localClusterUtil.deinit();
} catch (Exception e) {
throw new IOException(e);
}
@@ -107,13 +71,4 @@ public class TestClusterUtil {
return hds;
}
- public static void stopCluster(ClusterControllerService cc,
NodeControllerService nc) throws IOException {
- try {
- nc.stop();
- cc.stop();
- } catch (Exception e) {
- throw new IOException(e);
- }
- }
-
}
diff --git a/vxquery-xtest/src/main/java/org/apache/vxquery/xtest/XTest.java
b/vxquery-xtest/src/main/java/org/apache/vxquery/xtest/XTest.java
index 5aae691..df7a71d 100644
--- a/vxquery-xtest/src/main/java/org/apache/vxquery/xtest/XTest.java
+++ b/vxquery-xtest/src/main/java/org/apache/vxquery/xtest/XTest.java
@@ -16,10 +16,6 @@
*/
package org.apache.vxquery.xtest;
-import org.apache.hyracks.control.cc.ClusterControllerService;
-import org.apache.hyracks.control.nc.NodeControllerService;
-import org.mortbay.jetty.Server;
-
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
@@ -28,6 +24,8 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import org.mortbay.jetty.Server;
+
public class XTest {
private XTestOptions opts;
private Server server;
@@ -36,8 +34,6 @@ public class XTest {
private TestRunnerFactory trf;
private int count;
private int finishCount;
- private static NodeControllerService nc;
- private static ClusterControllerService cc;
XTest(XTestOptions opts) {
this.opts = opts;
@@ -81,8 +77,7 @@ public class XTest {
}
}
});
- cc = TestClusterUtil.startCC(opts);
- nc = TestClusterUtil.startNC();
+ TestClusterUtil.startCluster(opts, TestClusterUtil.localClusterUtil);
trf = new TestRunnerFactory(opts);
trf.registerReporters(reporters);
TestCaseFactory tcf = new TestCaseFactory(trf, eSvc, opts);
@@ -104,7 +99,7 @@ public class XTest {
r.close();
}
try {
- TestClusterUtil.stopCluster(cc, nc);
+ TestClusterUtil.stopCluster(TestClusterUtil.localClusterUtil);
} catch (IOException e) {
e.printStackTrace();
}
diff --git
a/vxquery-xtest/src/test/java/org/apache/vxquery/xtest/AbstractXQueryTest.java
b/vxquery-xtest/src/test/java/org/apache/vxquery/xtest/AbstractXQueryTest.java
index 1e2dcf6..8f77de4 100644
---
a/vxquery-xtest/src/test/java/org/apache/vxquery/xtest/AbstractXQueryTest.java
+++
b/vxquery-xtest/src/test/java/org/apache/vxquery/xtest/AbstractXQueryTest.java
@@ -22,8 +22,6 @@ import java.io.File;
import java.io.IOException;
import org.apache.commons.io.FileUtils;
-import org.apache.hyracks.control.cc.ClusterControllerService;
-import org.apache.hyracks.control.nc.NodeControllerService;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@@ -38,8 +36,6 @@ public abstract class AbstractXQueryTest {
private TestRunner tr;
private static MiniDFS dfs;
private final static String TMP = "target/tmp";
- private static NodeControllerService nc;
- private static ClusterControllerService cc;
protected abstract XTestOptions getTestOptions();
@@ -92,8 +88,7 @@ public abstract class AbstractXQueryTest {
@BeforeClass
public static void setup() throws IOException {
- cc = TestClusterUtil.startCC(getDefaultTestOptions());
- nc = TestClusterUtil.startNC();
+ TestClusterUtil.startCluster(getDefaultTestOptions(),
TestClusterUtil.localClusterUtil);
setupFS();
}
@@ -116,7 +111,7 @@ public abstract class AbstractXQueryTest {
@AfterClass
public static void shutdown() throws IOException {
removeFS();
- TestClusterUtil.stopCluster(cc, nc);
+ TestClusterUtil.stopCluster(TestClusterUtil.localClusterUtil);
}
public static void removeFS() throws IOException {