http://git-wip-us.apache.org/repos/asf/asterixdb/blob/392bbbc0/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java index ff30df3..b5fbcf5 100644 --- a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java +++ b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/aql/TestExecutor.java @@ -32,12 +32,15 @@ import java.io.StringWriter; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.net.Inet4Address; +import java.net.URI; +import java.net.URISyntaxException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -72,6 +75,7 @@ import org.apache.http.impl.client.HttpClients; import org.apache.http.impl.client.StandardHttpRequestRetryHandler; import org.apache.http.util.EntityUtils; import org.apache.hyracks.util.StorageUtil; +import org.junit.Assert; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.DeserializationFeature; @@ -91,12 +95,21 @@ public class TestExecutor { private static final long MAX_URL_LENGTH = 2000l; private static final Pattern JAVA_BLOCK_COMMENT_PATTERN = Pattern.compile("/\\*.*\\*/", Pattern.MULTILINE | Pattern.DOTALL); + private static final Pattern JAVA_LINE_COMMENT_PATTERN = Pattern.compile("//.*$", Pattern.MULTILINE); + private static final Pattern SHELL_LINE_COMMENT_PATTERN = Pattern.compile("#.*$", Pattern.MULTILINE); private static final Pattern REGEX_LINES_PATTERN = Pattern.compile("^(-)?/(.*)/([im]*)$"); private static final Pattern POLL_TIMEOUT_PATTERN = Pattern.compile("polltimeoutsecs=(\\d+)(\\D|$)", Pattern.MULTILINE); private static final Pattern POLL_DELAY_PATTERN = Pattern.compile("polldelaysecs=(\\d+)(\\D|$)", Pattern.MULTILINE); + private static final Pattern HANDLE_VARIABLE_PATTERN = Pattern.compile("handlevariable=(\\w+)"); + private static final Pattern VARIABLE_REF_PATTERN = Pattern.compile("\\$(\\w+)"); + public static final int TRUNCATE_THRESHOLD = 16384; + public static final String DELIVERY_ASYNC = "async"; + public static final String DELIVERY_DEFERRED = "deferred"; + public static final String DELIVERY_IMMEDIATE = "immediate"; + private static Method managixExecuteMethod = null; private static final HashMap<Integer, ITestServer> runningTestServers = new HashMap<>(); @@ -376,8 +389,13 @@ public class TestExecutor { // For tests where you simply want the byte-for-byte output. private static void writeOutputToFile(File actualFile, InputStream resultStream) throws Exception { - if (!actualFile.getParentFile().mkdirs()) { - LOGGER.warning("Unable to create actual file parent dir: " + actualFile.getParentFile()); + final File parentDir = actualFile.getParentFile(); + if (!parentDir.isDirectory()) { + if (parentDir.exists()) { + LOGGER.warning("Actual file parent \"" + parentDir + "\" exists but is not a directory"); + } else if (!parentDir.mkdirs()) { + LOGGER.warning("Unable to create actual file parent dir: " + parentDir); + } } try (FileOutputStream out = new FileOutputStream(actualFile)) { IOUtils.copy(resultStream, out); @@ -424,38 +442,30 @@ public class TestExecutor { return httpResponse; } - public InputStream executeQuery(String str, OutputFormat fmt, String url, List<CompilationUnit.Parameter> params) + public InputStream executeQuery(String str, OutputFormat fmt, URI uri, List<CompilationUnit.Parameter> params) throws Exception { - HttpUriRequest method = constructHttpMethod(str, url, "query", false, params); + HttpUriRequest method = constructHttpMethod(str, uri, "query", false, params); // Set accepted output response type method.setHeader("Accept", fmt.mimeType()); HttpResponse response = executeAndCheckHttpRequest(method); return response.getEntity().getContent(); } - public InputStream executeQueryService(String str, String url) throws Exception { - return executeQueryService(str, OutputFormat.CLEAN_JSON, url, new ArrayList<>(), false); + public InputStream executeQueryService(String str, URI uri) throws Exception { + return executeQueryService(str, OutputFormat.CLEAN_JSON, uri, new ArrayList<>(), false); } - public InputStream executeQueryService(String str, OutputFormat fmt, String url, + public InputStream executeQueryService(String str, OutputFormat fmt, URI uri, List<CompilationUnit.Parameter> params, boolean jsonEncoded) throws Exception { setParam(params, "format", fmt.mimeType()); - HttpUriRequest method = jsonEncoded ? constructPostMethodJson(str, url, "statement", params) - : constructPostMethodUrl(str, url, "statement", params); + HttpUriRequest method = jsonEncoded ? constructPostMethodJson(str, uri, "statement", params) + : constructPostMethodUrl(str, uri, "statement", params); // Set accepted output response type method.setHeader("Accept", OutputFormat.CLEAN_JSON.mimeType()); HttpResponse response = executeHttpRequest(method); return response.getEntity().getContent(); } - public InputStream executeQueryService(String statement, OutputFormat fmt, String url, - List<CompilationUnit.Parameter> params, boolean jsonEncoded, String deferred) throws Exception { - setParam(params, "mode", deferred); - InputStream resultStream = executeQueryService(statement, fmt, url, params, jsonEncoded); - String handle = ResultExtractor.extractHandle(resultStream); - return getHandleResult(handle, fmt); - } - protected void setParam(List<CompilationUnit.Parameter> params, String name, String value) { for (CompilationUnit.Parameter param : params) { if (name.equals(param.getName())) { @@ -479,19 +489,19 @@ public class TestExecutor { return params; } - private HttpUriRequest constructHttpMethod(String statement, String endpoint, String stmtParam, - boolean postStmtAsParam, List<CompilationUnit.Parameter> otherParams) { - if (statement.length() + endpoint.length() < MAX_URL_LENGTH) { + private HttpUriRequest constructHttpMethod(String statement, URI uri, String stmtParam, + boolean postStmtAsParam, List<CompilationUnit.Parameter> otherParams) throws URISyntaxException { + if (statement.length() + uri.toString().length() < MAX_URL_LENGTH) { // Use GET for small-ish queries - return constructGetMethod(endpoint, injectStatement(statement, stmtParam, otherParams)); + return constructGetMethod(uri, injectStatement(statement, stmtParam, otherParams)); } else { // Use POST for bigger ones to avoid 413 FULL_HEAD String stmtParamName = (postStmtAsParam ? stmtParam : null); - return constructPostMethodUrl(statement, endpoint, stmtParamName, otherParams); + return constructPostMethodUrl(statement, uri, stmtParamName, otherParams); } } - private HttpUriRequest constructGetMethod(String endpoint, List<CompilationUnit.Parameter> params) { + private HttpUriRequest constructGetMethod(URI endpoint, List<CompilationUnit.Parameter> params) { RequestBuilder builder = RequestBuilder.get(endpoint); for (CompilationUnit.Parameter param : params) { builder.addParameter(param.getName(), param.getValue()); @@ -500,17 +510,16 @@ public class TestExecutor { return builder.build(); } - private HttpUriRequest constructGetMethod(String endpoint, OutputFormat fmt, + private HttpUriRequest constructGetMethod(URI endpoint, OutputFormat fmt, List<CompilationUnit.Parameter> params) { - HttpUriRequest method = constructGetMethod(endpoint, params); // Set accepted output response type method.setHeader("Accept", fmt.mimeType()); return method; } - private HttpUriRequest constructPostMethod(String endpoint, List<CompilationUnit.Parameter> params) { - RequestBuilder builder = RequestBuilder.post(endpoint); + private HttpUriRequest constructPostMethod(URI uri, List<CompilationUnit.Parameter> params) { + RequestBuilder builder = RequestBuilder.post(uri); for (CompilationUnit.Parameter param : params) { builder.addParameter(param.getName(), param.getValue()); } @@ -518,18 +527,17 @@ public class TestExecutor { return builder.build(); } - private HttpUriRequest constructPostMethod(String endpoint, OutputFormat fmt, + private HttpUriRequest constructPostMethod(URI uri, OutputFormat fmt, List<CompilationUnit.Parameter> params) { - - HttpUriRequest method = constructPostMethod(endpoint, params); + HttpUriRequest method = constructPostMethod(uri, params); // Set accepted output response type method.setHeader("Accept", fmt.mimeType()); return method; } - protected HttpUriRequest constructPostMethodUrl(String statement, String endpoint, String stmtParam, + protected HttpUriRequest constructPostMethodUrl(String statement, URI uri, String stmtParam, List<CompilationUnit.Parameter> otherParams) { - RequestBuilder builder = RequestBuilder.post(endpoint); + RequestBuilder builder = RequestBuilder.post(uri); if (stmtParam != null) { for (CompilationUnit.Parameter param : injectStatement(statement, stmtParam, otherParams)) { builder.addParameter(param.getName(), param.getValue()); @@ -543,12 +551,12 @@ public class TestExecutor { return builder.build(); } - protected HttpUriRequest constructPostMethodJson(String statement, String endpoint, String stmtParam, + protected HttpUriRequest constructPostMethodJson(String statement, URI uri, String stmtParam, List<CompilationUnit.Parameter> otherParams) { if (stmtParam == null) { throw new NullPointerException("Statement parameter required."); } - RequestBuilder builder = RequestBuilder.post(endpoint); + RequestBuilder builder = RequestBuilder.post(uri); ObjectMapper om = new ObjectMapper(); ObjectNode content = om.createObjectNode(); for (CompilationUnit.Parameter param : injectStatement(statement, stmtParam, otherParams)) { @@ -563,23 +571,23 @@ public class TestExecutor { return builder.build(); } - public InputStream executeJSONGet(OutputFormat fmt, String url) throws Exception { - HttpUriRequest request = constructGetMethod(url, fmt, new ArrayList<>()); + public InputStream executeJSONGet(OutputFormat fmt, URI uri) throws Exception { + HttpUriRequest request = constructGetMethod(uri, fmt, new ArrayList<>()); HttpResponse response = executeAndCheckHttpRequest(request); return response.getEntity().getContent(); } - public InputStream executeJSONPost(OutputFormat fmt, String url) throws Exception { - HttpUriRequest request = constructPostMethod(url, fmt, new ArrayList<>()); + public InputStream executeJSONPost(OutputFormat fmt, URI uri) throws Exception { + HttpUriRequest request = constructPostMethod(uri, fmt, new ArrayList<>()); HttpResponse response = executeAndCheckHttpRequest(request); return response.getEntity().getContent(); } // To execute Update statements // Insert and Delete statements are executed here - public void executeUpdate(String str, String url) throws Exception { + public void executeUpdate(String str, URI uri) throws Exception { // Create a method instance. - HttpUriRequest request = RequestBuilder.post(url).setEntity(new StringEntity(str, StandardCharsets.UTF_8)) + HttpUriRequest request = RequestBuilder.post(uri).setEntity(new StringEntity(str, StandardCharsets.UTF_8)) .build(); // Execute the method. @@ -587,30 +595,25 @@ public class TestExecutor { } // Executes AQL in either async or async-defer mode. - public InputStream executeAnyAQLAsync(String str, boolean defer, OutputFormat fmt, String url) throws Exception { + public InputStream executeAnyAQLAsync(String statement, boolean defer, OutputFormat fmt, URI uri, + Map<String, Object> variableCtx) throws Exception { // Create a method instance. - HttpUriRequest request = RequestBuilder.post(url) + HttpUriRequest request = RequestBuilder.post(uri) .addParameter("mode", defer ? "asynchronous-deferred" : "asynchronous") - .setEntity(new StringEntity(str, StandardCharsets.UTF_8)).setHeader("Accept", fmt.mimeType()).build(); + .setEntity(new StringEntity(statement, StandardCharsets.UTF_8)).setHeader("Accept", fmt.mimeType()) + .build(); + + String handleVar = getHandleVariable(statement); HttpResponse response = executeAndCheckHttpRequest(request); InputStream resultStream = response.getEntity().getContent(); + String handle = IOUtils.toString(resultStream, "UTF-8"); - String theHandle = IOUtils.toString(resultStream, "UTF-8"); - - // take the handle and parse it so results can be retrieved - return getHandleResult(theHandle, fmt); - } - - private InputStream getHandleResult(String handle, OutputFormat fmt) throws Exception { - final String url = getEndpoint(Lets.QUERY_RESULT); - - // Create a method instance. - HttpUriRequest request = RequestBuilder.get(url).addParameter("handle", handle) - .setHeader("Accept", fmt.mimeType()).build(); - - HttpResponse response = executeAndCheckHttpRequest(request); - return response.getEntity().getContent(); + if (handleVar != null) { + variableCtx.put(handleVar, handle); + return resultStream; + } + return null; } // To execute DDL and Update statements @@ -619,9 +622,9 @@ public class TestExecutor { // create index statement // create dataverse statement // create function statement - public void executeDDL(String str, String url) throws Exception { + public void executeDDL(String str, URI uri) throws Exception { // Create a method instance. - HttpUriRequest request = RequestBuilder.post(url).setEntity(new StringEntity(str, StandardCharsets.UTF_8)) + HttpUriRequest request = RequestBuilder.post(uri).setEntity(new StringEntity(str, StandardCharsets.UTF_8)) .build(); // Execute the method. @@ -735,9 +738,10 @@ public class TestExecutor { executeTest(actualPath, testCaseCtx, pb, isDmlRecoveryTest, null); } - public void executeTest(TestCaseContext testCaseCtx, TestFileContext ctx, String statement, - boolean isDmlRecoveryTest, ProcessBuilder pb, CompilationUnit cUnit, MutableInt queryCount, - List<TestFileContext> expectedResultFileCtxs, File testFile, String actualPath) throws Exception { + public void executeTestFile(TestCaseContext testCaseCtx, TestFileContext ctx, Map<String, Object> variableCtx, + String statement, boolean isDmlRecoveryTest, ProcessBuilder pb, CompilationUnit cUnit, + MutableInt queryCount, List<TestFileContext> expectedResultFileCtxs, File testFile, String actualPath) + throws Exception { File qbcFile; boolean failed = false; File expectedResultFile; @@ -762,17 +766,11 @@ public class TestExecutor { ResultExtractor.extract(resultStream); } break; + case "pollget": case "pollquery": // polltimeoutsecs=nnn, polldelaysecs=nnn - final Matcher timeoutMatcher = POLL_TIMEOUT_PATTERN.matcher(statement); - int timeoutSecs; - if (timeoutMatcher.find()) { - timeoutSecs = Integer.parseInt(timeoutMatcher.group(1)); - } else { - throw new IllegalArgumentException("ERROR: polltimeoutsecs=nnn must be present in poll file"); - } - final Matcher retryDelayMatcher = POLL_DELAY_PATTERN.matcher(statement); - int retryDelaySecs = retryDelayMatcher.find() ? Integer.parseInt(timeoutMatcher.group(1)) : 1; + int timeoutSecs = getTimeoutSecs(statement); + int retryDelaySecs = getRetryDelaySecs(statement); long startTime = System.currentTimeMillis(); long limitTime = startTime + TimeUnit.SECONDS.toMillis(timeoutSecs); ctx.setType(ctx.getType().substring("poll".length())); @@ -780,7 +778,7 @@ public class TestExecutor { LOGGER.fine("polling for up to " + timeoutSecs + " seconds w/ " + retryDelaySecs + " second(s) delay"); while (true) { try { - executeTest(testCaseCtx, ctx, statement, isDmlRecoveryTest, pb, cUnit, queryCount, + executeTestFile(testCaseCtx, ctx, variableCtx, statement, isDmlRecoveryTest, pb, cUnit, queryCount, expectedResultFileCtxs, testFile, actualPath); finalException = null; break; @@ -800,7 +798,7 @@ public class TestExecutor { break; case "query": case "async": - case "asyncdefer": + case "deferred": // isDmlRecoveryTest: insert Crash and Recovery if (isDmlRecoveryTest) { executeScript(pb, pb.environment().get("SCRIPT_HOME") + File.separator + "dml_recovery" @@ -810,31 +808,43 @@ public class TestExecutor { } InputStream resultStream = null; OutputFormat fmt = OutputFormat.forCompilationUnit(cUnit); + final String reqType = ctx.getType(); + final List<CompilationUnit.Parameter> params = cUnit.getParameter(); if (ctx.getFile().getName().endsWith("aql")) { - if (ctx.getType().equalsIgnoreCase("query")) { - resultStream = executeQuery(statement, fmt, getEndpoint(Lets.AQL_QUERY), - cUnit.getParameter()); - } else if (ctx.getType().equalsIgnoreCase("async")) { - resultStream = executeAnyAQLAsync(statement, false, fmt, getEndpoint(Lets.AQL)); - } else if (ctx.getType().equalsIgnoreCase("asyncdefer")) { - resultStream = executeAnyAQLAsync(statement, true, fmt, getEndpoint(Lets.AQL)); + if (reqType.equalsIgnoreCase("query")) { + resultStream = executeQuery(statement, fmt, getEndpoint(Lets.AQL_QUERY), params); + } else { + final URI endpoint = getEndpoint(Lets.AQL); + if (reqType.equalsIgnoreCase("async")) { + resultStream = executeAnyAQLAsync(statement, false, fmt, endpoint, variableCtx); + } else if (reqType.equalsIgnoreCase("deferred")) { + resultStream = executeAnyAQLAsync(statement, true, fmt, endpoint, variableCtx); + } + Assert.assertNotNull("no handle for " + reqType + " test " + testFile.toString(), resultStream); } } else { - final String reqType = ctx.getType(); - final String url = getEndpoint(Lets.QUERY_SERVICE); - final List<CompilationUnit.Parameter> params = cUnit.getParameter(); - if (reqType.equalsIgnoreCase("query")) { - resultStream = executeQueryService(statement, fmt, url, params, true); + String delivery = DELIVERY_IMMEDIATE; + if (reqType.equalsIgnoreCase("async")) { + delivery = DELIVERY_ASYNC; + } else if (reqType.equalsIgnoreCase("deferred")) { + delivery = DELIVERY_DEFERRED; + } + final URI uri = getEndpoint(Lets.QUERY_SERVICE); + if (DELIVERY_IMMEDIATE.equals(delivery)) { + resultStream = executeQueryService(statement, fmt, uri, params, true); resultStream = ResultExtractor.extract(resultStream); - } else if (reqType.equalsIgnoreCase("async")) { - resultStream = executeQueryService(statement, fmt, url, params, true, "async"); - } else if (reqType.equalsIgnoreCase("asyncdefer")) { - resultStream = executeQueryService(statement, fmt, url, params, true, "deferred"); + } else { + String handleVar = getHandleVariable(statement); + setParam(params, "mode", delivery); + resultStream = executeQueryService(statement, fmt, uri, params, true); + String handle = ResultExtractor.extractHandle(resultStream); + Assert.assertNotNull("no handle for " + reqType + " test " + testFile.toString(), handleVar); + variableCtx.put(handleVar, handle); } } if (queryCount.intValue() >= expectedResultFileCtxs.size()) { - throw new IllegalStateException("no result file for " + testFile.toString() + "; queryCount: " - + queryCount + ", filectxs.size: " + expectedResultFileCtxs.size()); + Assert.fail("no result file for " + testFile.toString() + "; queryCount: " + queryCount + + ", filectxs.size: " + expectedResultFileCtxs.size()); } expectedResultFile = expectedResultFileCtxs.get(queryCount.intValue()).getFile(); @@ -938,17 +948,9 @@ public class TestExecutor { "Unexpected format for method " + ctx.getType() + ": " + ctx.extension()); } fmt = OutputFormat.forCompilationUnit(cUnit); - String endpoint = stripJavaComments(statement).trim(); - switch (ctx.getType()) { - case "get": - resultStream = executeJSONGet(fmt, "http://" + host + ":" + port + endpoint); - break; - case "post": - resultStream = executeJSONPost(fmt, "http://" + host + ":" + port + endpoint); - break; - default: - throw new IllegalStateException("NYI: " + ctx.getType()); - } + final String trimmedPathAndQuery = stripLineComments(stripJavaComments(statement)).trim(); + final String variablesReplaced = replaceVarRef(trimmedPathAndQuery, variableCtx); + resultStream = executeHttp(ctx.getType(), variablesReplaced, fmt); expectedResultFile = expectedResultFileCtxs.get(queryCount.intValue()).getFile(); actualResultFile = testCaseCtx.getActualResultFile(cUnit, expectedResultFile, new File(actualPath)); writeOutputToFile(actualResultFile, resultStream); @@ -1047,11 +1049,56 @@ public class TestExecutor { } } + protected int getTimeoutSecs(String statement) { + final Matcher timeoutMatcher = POLL_TIMEOUT_PATTERN.matcher(statement); + if (timeoutMatcher.find()) { + return Integer.parseInt(timeoutMatcher.group(1)); + } else { + throw new IllegalArgumentException("ERROR: polltimeoutsecs=nnn must be present in poll file"); + } + } + + protected static int getRetryDelaySecs(String statement) { + final Matcher retryDelayMatcher = POLL_DELAY_PATTERN.matcher(statement); + return retryDelayMatcher.find() ? Integer.parseInt(retryDelayMatcher.group(1)) : 1; + } + + protected static String getHandleVariable(String statement) { + final Matcher handleVariableMatcher = HANDLE_VARIABLE_PATTERN.matcher(statement); + return handleVariableMatcher.find() ? handleVariableMatcher.group(1) : null; + } + + protected static String replaceVarRef(String statement, Map<String, Object> variableCtx) { + String tmpStmt = statement; + Matcher variableReferenceMatcher = VARIABLE_REF_PATTERN.matcher(tmpStmt); + while (variableReferenceMatcher.find()) { + String var = variableReferenceMatcher.group(1); + Object value = variableCtx.get(var); + Assert.assertNotNull("No value for variable reference $" + var, value); + tmpStmt = tmpStmt.replace("$" + var, String.valueOf(value)); + variableReferenceMatcher = VARIABLE_REF_PATTERN.matcher(tmpStmt); + } + return tmpStmt; + } + + protected InputStream executeHttp(String ctxType, String endpoint, OutputFormat fmt) throws Exception { + String[] split = endpoint.split("\\?"); + URI uri = new URI("http", null, host, port, split[0], split.length > 1 ? split[1] : null, null); + switch (ctxType) { + case "get": + return executeJSONGet(fmt, uri); + case "post": + return executeJSONPost(fmt, uri); + default: + throw new AssertionError("Not implemented: " + ctxType); + } + } + private void killNC(String nodeId, CompilationUnit cUnit) throws Exception { //get node process id OutputFormat fmt = OutputFormat.forCompilationUnit(cUnit); String endpoint = "/admin/cluster/node/" + nodeId + "/config"; - InputStream executeJSONGet = executeJSONGet(fmt, "http://" + host + ":" + port + endpoint); + InputStream executeJSONGet = executeJSONGet(fmt, new URI("http", null, host, port, endpoint, null, null)); StringWriter actual = new StringWriter(); IOUtils.copy(executeJSONGet, actual, StandardCharsets.UTF_8); String config = actual.toString(); @@ -1065,10 +1112,6 @@ public class TestExecutor { public void executeTest(String actualPath, TestCaseContext testCaseCtx, ProcessBuilder pb, boolean isDmlRecoveryTest, TestGroup failedGroup) throws Exception { - File testFile; - String statement; - List<TestFileContext> expectedResultFileCtxs; - List<TestFileContext> testFileCtxs; MutableInt queryCount = new MutableInt(0); int numOfErrors = 0; int numOfFiles = 0; @@ -1076,14 +1119,15 @@ public class TestExecutor { for (CompilationUnit cUnit : cUnits) { LOGGER.info( "Starting [TEST]: " + testCaseCtx.getTestCase().getFilePath() + "/" + cUnit.getName() + " ... "); - testFileCtxs = testCaseCtx.getTestFiles(cUnit); - expectedResultFileCtxs = testCaseCtx.getExpectedResultFiles(cUnit); + Map<String, Object> variableCtx = new HashMap<>(); + List<TestFileContext> testFileCtxs = testCaseCtx.getTestFiles(cUnit); + List<TestFileContext> expectedResultFileCtxs = testCaseCtx.getExpectedResultFiles(cUnit); for (TestFileContext ctx : testFileCtxs) { numOfFiles++; - testFile = ctx.getFile(); - statement = readTestFile(testFile); + final File testFile = ctx.getFile(); + final String statement = readTestFile(testFile); try { - executeTest(testCaseCtx, ctx, statement, isDmlRecoveryTest, pb, cUnit, queryCount, + executeTestFile(testCaseCtx, ctx, variableCtx, statement, isDmlRecoveryTest, pb, cUnit, queryCount, expectedResultFileCtxs, testFile, actualPath); } catch (Exception e) { System.err.println("testFile " + testFile.toString() + " raised an exception: " + e); @@ -1140,14 +1184,19 @@ public class TestExecutor { return servlet.getPath(); } - protected String getEndpoint(Lets servlet) { - return "http://" + host + ":" + port + getPath(servlet).replaceAll("/\\*$", ""); + protected URI getEndpoint(Lets servlet) throws URISyntaxException { + return new URI("http", null, host, port, getPath(servlet).replaceAll("/\\*$", ""), null, null); } public static String stripJavaComments(String text) { return JAVA_BLOCK_COMMENT_PATTERN.matcher(text).replaceAll(""); } + public static String stripLineComments(String text) { + final String s = SHELL_LINE_COMMENT_PATTERN.matcher(text).replaceAll(""); + return JAVA_LINE_COMMENT_PATTERN.matcher(s).replaceAll(""); + } + public void cleanup(String testCase, List<String> badtestcases) throws Exception { try { ArrayList<String> toBeDropped = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/392bbbc0/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/ManagixSqlppExecutionIT.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/ManagixSqlppExecutionIT.java b/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/ManagixSqlppExecutionIT.java index 50d103a..2eada38 100644 --- a/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/ManagixSqlppExecutionIT.java +++ b/asterixdb/asterix-installer/src/test/java/org/apache/asterix/installer/test/ManagixSqlppExecutionIT.java @@ -29,7 +29,7 @@ import org.junit.runners.Parameterized.Parameters; @RunWith(Parameterized.class) public class ManagixSqlppExecutionIT extends ManagixExecutionIT { - @Parameters + @Parameters(name = "ManagixSqlppExecutionIT {index}: {0}") public static Collection<Object[]> tests() throws Exception { Collection<Object[]> testArgs = buildTestsInXml("only_sqlpp.xml"); if (testArgs.size() == 0) { @@ -48,10 +48,7 @@ public class ManagixSqlppExecutionIT extends ManagixExecutionIT { } - private TestCaseContext tcCtx; - public ManagixSqlppExecutionIT(TestCaseContext tcCtx) { super(tcCtx); - this.tcCtx = tcCtx; } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/392bbbc0/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java index 6eaf4ae..0769596 100644 --- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java +++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/functions/BuiltinFunctions.java @@ -92,6 +92,7 @@ import org.apache.asterix.om.typecomputer.impl.RecordMergeTypeComputer; import org.apache.asterix.om.typecomputer.impl.RecordPairsTypeComputer; import org.apache.asterix.om.typecomputer.impl.RecordRemoveFieldsTypeComputer; import org.apache.asterix.om.typecomputer.impl.ScalarVersionOfAggregateResultType; +import org.apache.asterix.om.typecomputer.impl.SleepTypeComputer; import org.apache.asterix.om.typecomputer.impl.StringBooleanTypeComputer; import org.apache.asterix.om.typecomputer.impl.StringInt32TypeComputer; import org.apache.asterix.om.typecomputer.impl.StringIntToStringTypeComputer; @@ -651,6 +652,8 @@ public class BuiltinFunctions { "spatial-cell", 4); public static final FunctionIdentifier SWITCH_CASE = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "switch-case", FunctionIdentifier.VARARGS); + public static final FunctionIdentifier SLEEP = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, + "sleep", 2); public static final FunctionIdentifier INJECT_FAILURE = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, "inject-failure", 2); public static final FunctionIdentifier FLOW_RECORD = new FunctionIdentifier(FunctionConstants.ASTERIX_NS, @@ -1058,6 +1061,7 @@ public class BuiltinFunctions { addPrivateFunction(SUBSET_COLLECTION, SubsetCollectionTypeComputer.INSTANCE, true); addFunction(SUBSTRING, SubstringTypeComputer.INSTANCE, true); addFunction(SWITCH_CASE, SwitchCaseComputer.INSTANCE, true); + addFunction(SLEEP, SleepTypeComputer.INSTANCE, false); addPrivateFunction(INJECT_FAILURE, InjectFailureTypeComputer.INSTANCE, true); addPrivateFunction(CAST_TYPE, CastTypeComputer.INSTANCE, true); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/392bbbc0/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/InjectFailureTypeComputer.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/InjectFailureTypeComputer.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/InjectFailureTypeComputer.java index df050be..cc19ac4 100644 --- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/InjectFailureTypeComputer.java +++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/InjectFailureTypeComputer.java @@ -21,29 +21,31 @@ package org.apache.asterix.om.typecomputer.impl; import org.apache.asterix.om.exceptions.TypeMismatchException; import org.apache.asterix.om.typecomputer.base.AbstractResultTypeComputer; import org.apache.asterix.om.types.ATypeTag; -import org.apache.asterix.om.types.BuiltinType; import org.apache.asterix.om.types.IAType; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression; +/** + * The first argument of INJECT_FAILURE can be any data model instance and will be passed verbatim to the + * caller. The second argument is a boolean that determines if the invocation throws an exception. + * + * Consequently {@link #checkArgType(String, int, IAType)} validates that the second argument is a + * boolean and {@link #getResultType(ILogicalExpression, IAType...)} returns the type of the first + * argument. + */ public class InjectFailureTypeComputer extends AbstractResultTypeComputer { public static final InjectFailureTypeComputer INSTANCE = new InjectFailureTypeComputer(); - protected InjectFailureTypeComputer() { - } - @Override protected void checkArgType(String funcName, int argIndex, IAType type) throws AlgebricksException { - ATypeTag actualTypeTag = type.getTypeTag(); - if (actualTypeTag != ATypeTag.BOOLEAN) { - throw new TypeMismatchException(funcName, argIndex, actualTypeTag, ATypeTag.BOOLEAN); + if (argIndex == 1 && type.getTypeTag() != ATypeTag.BOOLEAN) { + throw new TypeMismatchException(funcName, argIndex, type.getTypeTag(), ATypeTag.BOOLEAN); } } @Override protected IAType getResultType(ILogicalExpression expr, IAType... strippedInputTypes) throws AlgebricksException { - return BuiltinType.ABOOLEAN; + return strippedInputTypes[0]; } - } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/392bbbc0/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/SleepTypeComputer.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/SleepTypeComputer.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/SleepTypeComputer.java new file mode 100644 index 0000000..6b885e3 --- /dev/null +++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/om/typecomputer/impl/SleepTypeComputer.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.om.typecomputer.impl; + +import org.apache.asterix.om.exceptions.TypeMismatchException; +import org.apache.asterix.om.typecomputer.base.AbstractResultTypeComputer; +import org.apache.asterix.om.types.ATypeTag; +import org.apache.asterix.om.types.IAType; +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression; + +public class SleepTypeComputer extends AbstractResultTypeComputer { + public static final SleepTypeComputer INSTANCE = new SleepTypeComputer(); + + @Override + public void checkArgType(String funcName, int argIndex, IAType type) throws AlgebricksException { + if (argIndex == 1) { + switch (type.getTypeTag()) { + case INT8: + case INT16: + case INT32: + case INT64: + break; + default: + throw new TypeMismatchException(funcName, argIndex, type.getTypeTag(), ATypeTag.INT8, + ATypeTag.INT16, ATypeTag.INT32, ATypeTag.INT64); + } + } + } + + @Override + public IAType getResultType(ILogicalExpression expr, IAType... types) throws AlgebricksException { + return types[0]; + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/392bbbc0/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/InjectFailureDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/InjectFailureDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/InjectFailureDescriptor.java index 164f369..af5f690 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/InjectFailureDescriptor.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/InjectFailureDescriptor.java @@ -18,6 +18,9 @@ */ package org.apache.asterix.runtime.evaluators.functions; +import java.util.logging.Level; +import java.util.logging.Logger; + import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.common.exceptions.RuntimeDataException; import org.apache.asterix.dataflow.data.nontagged.serde.ABooleanSerializerDeserializer; @@ -39,6 +42,9 @@ import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference; public class InjectFailureDescriptor extends AbstractScalarFunctionDynamicDescriptor { private static final long serialVersionUID = 1L; + + private static final Logger LOGGER = Logger.getLogger(SleepDescriptor.class.getSimpleName()); + public static final IFunctionDescriptorFactory FACTORY = new IFunctionDescriptorFactory() { @Override public IFunctionDescriptor createFunctionDescriptor() { @@ -75,6 +81,7 @@ public class InjectFailureDescriptor extends AbstractScalarFunctionDynamicDescri boolean argResult = ABooleanSerializerDeserializer.getBoolean(argPtr.getByteArray(), argPtr.getStartOffset() + 1); if (argResult) { + LOGGER.log(Level.SEVERE, ctx.getTaskAttemptId() + " injecting failure"); throw new RuntimeDataException(ErrorCode.INJECTED_FAILURE, getIdentifier()); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/392bbbc0/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/SleepDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/SleepDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/SleepDescriptor.java new file mode 100644 index 0000000..a186b32 --- /dev/null +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/SleepDescriptor.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.runtime.evaluators.functions; + +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.asterix.om.functions.BuiltinFunctions; +import org.apache.asterix.om.functions.IFunctionDescriptorFactory; +import org.apache.asterix.om.types.hierachy.ATypeHierarchy; +import org.apache.asterix.runtime.evaluators.base.AbstractScalarFunctionDynamicDescriptor; +import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.data.std.api.IPointable; +import org.apache.hyracks.data.std.primitive.VoidPointable; +import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference; + +public class SleepDescriptor extends AbstractScalarFunctionDynamicDescriptor { + + private static final long serialVersionUID = 1L; + + private static final Logger LOGGER = Logger.getLogger(SleepDescriptor.class.getSimpleName()); + + public static final IFunctionDescriptorFactory FACTORY = SleepDescriptor::new; + + @Override + public IScalarEvaluatorFactory createEvaluatorFactory(final IScalarEvaluatorFactory[] args) { + return new IScalarEvaluatorFactory() { + private static final long serialVersionUID = 1L; + + @Override + public IScalarEvaluator createScalarEvaluator(final IHyracksTaskContext ctx) throws HyracksDataException { + return new IScalarEvaluator() { + + private IPointable argTime = new VoidPointable(); + private final IScalarEvaluator evalValue = args[0].createScalarEvaluator(ctx); + private final IScalarEvaluator evalTime = args[1].createScalarEvaluator(ctx); + + @Override + public void evaluate(IFrameTupleReference tuple, IPointable result) throws HyracksDataException { + evalValue.evaluate(tuple, result); + evalTime.evaluate(tuple, argTime); + + final byte[] bytes = argTime.getByteArray(); + final int offset = argTime.getStartOffset(); + final long time = ATypeHierarchy.getLongValue(getIdentifier().getName(), 1, bytes, offset); + + try { + if (LOGGER.isLoggable(Level.INFO)) { + LOGGER.log(Level.INFO, ctx.getTaskAttemptId() + " sleeping for " + time + " ms"); + } + Thread.sleep(time); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + if (LOGGER.isLoggable(Level.INFO)) { + LOGGER.log(Level.INFO, ctx.getTaskAttemptId() + " done sleeping for " + time + " ms"); + } + } + } + }; + } + }; + } + + @Override + public FunctionIdentifier getIdentifier() { + return BuiltinFunctions.SLEEP; + } + +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/392bbbc0/asterixdb/asterix-server/src/test/java/org/apache/asterix/server/test/SampleLocalClusterIT.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-server/src/test/java/org/apache/asterix/server/test/SampleLocalClusterIT.java b/asterixdb/asterix-server/src/test/java/org/apache/asterix/server/test/SampleLocalClusterIT.java index c926ec0..175ecc4 100644 --- a/asterixdb/asterix-server/src/test/java/org/apache/asterix/server/test/SampleLocalClusterIT.java +++ b/asterixdb/asterix-server/src/test/java/org/apache/asterix/server/test/SampleLocalClusterIT.java @@ -24,6 +24,7 @@ import java.io.File; import java.io.IOException; import java.io.InputStream; import java.io.StringWriter; +import java.net.URI; import java.net.URL; import java.util.Collections; @@ -89,7 +90,8 @@ public class SampleLocalClusterIT { public void test1_sanityQuery() throws Exception { TestExecutor testExecutor = new TestExecutor(); InputStream resultStream = testExecutor.executeQuery("1+1", OutputFormat.ADM, - "http://127.0.0.1:19002" + Lets.AQL_QUERY.getPath(), Collections.emptyList()); + new URI("http", null, "127.0.0.1", 19002, Lets.AQL_QUERY.getPath(), null, null), + Collections.emptyList()); StringWriter sw = new StringWriter(); IOUtils.copy(resultStream, sw); Assert.assertEquals("2", sw.toString().trim()); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/392bbbc0/asterixdb/asterix-test-framework/src/main/java/org/apache/asterix/testframework/context/TestFileContext.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-test-framework/src/main/java/org/apache/asterix/testframework/context/TestFileContext.java b/asterixdb/asterix-test-framework/src/main/java/org/apache/asterix/testframework/context/TestFileContext.java index e41a624..7a53a9e 100644 --- a/asterixdb/asterix-test-framework/src/main/java/org/apache/asterix/testframework/context/TestFileContext.java +++ b/asterixdb/asterix-test-framework/src/main/java/org/apache/asterix/testframework/context/TestFileContext.java @@ -59,10 +59,16 @@ public class TestFileContext implements Comparable<TestFileContext> { @Override public int compareTo(TestFileContext o) { - if (this.seqNum > o.seqNum) + if (this.seqNum > o.seqNum) { return 1; - else if (this.seqNum < o.seqNum) + } else if (this.seqNum < o.seqNum) { return -1; + } return 0; } + + @Override + public String toString() { + return String.valueOf(seqNum) + ":" + type + ":" + file; + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/392bbbc0/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetDirectoryRecord.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetDirectoryRecord.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetDirectoryRecord.java index a50e1ee..3165840 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetDirectoryRecord.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetDirectoryRecord.java @@ -71,17 +71,24 @@ public class DatasetDirectoryRecord implements Serializable { } public void start() { - status = Status.RUNNING; + updateStatus(Status.RUNNING); } public void writeEOS() { - status = Status.SUCCESS; + updateStatus(Status.SUCCESS); } public void fail() { status = Status.FAILED; } + private void updateStatus(final DatasetDirectoryRecord.Status newStatus) { + // FAILED is a stable status + if (status != Status.FAILED) { + status = newStatus; + } + } + public Status getStatus() { return status; } @@ -99,6 +106,6 @@ public class DatasetDirectoryRecord implements Serializable { @Override public String toString() { - return address.toString() + " " + status + (empty ? " (empty)" : "") + (readEOS ? " (EOS)" : ""); + return String.valueOf(address) + " " + status + (empty ? " (empty)" : "") + (readEOS ? " (EOS)" : ""); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/392bbbc0/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetJobRecord.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetJobRecord.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetJobRecord.java index 34ed65c..f29ff4a 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetJobRecord.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/DatasetJobRecord.java @@ -20,9 +20,14 @@ package org.apache.hyracks.api.dataset; import java.util.HashMap; import java.util.List; +import java.util.Map; -public class DatasetJobRecord extends HashMap<ResultSetId, ResultSetMetaData> implements IDatasetStateRecord { +import org.apache.hyracks.api.exceptions.ErrorCode; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public class DatasetJobRecord implements IDatasetStateRecord { public enum Status { + IDLE, RUNNING, SUCCESS, FAILED @@ -36,20 +41,30 @@ public class DatasetJobRecord extends HashMap<ResultSetId, ResultSetMetaData> im private List<Exception> exceptions; + private Map<ResultSetId, ResultSetMetaData> resultSetMetadataMap = new HashMap<>(); + public DatasetJobRecord() { this.timestamp = System.currentTimeMillis(); - this.status = Status.RUNNING; + this.status = Status.IDLE; + } + + private void updateStatus(Status newStatus) { + // FAILED is a stable status + if (status != Status.FAILED) { + status = newStatus; + } } public void start() { - status = Status.RUNNING; + updateStatus(Status.RUNNING); } public void success() { - status = Status.SUCCESS; + updateStatus(Status.SUCCESS); } - public void fail() { + public void fail(ResultSetId rsId, int partition) { + getOrCreateDirectoryRecord(rsId, partition).fail(); status = Status.FAILED; } @@ -58,6 +73,7 @@ public class DatasetJobRecord extends HashMap<ResultSetId, ResultSetMetaData> im this.exceptions = exceptions; } + @Override public long getTimestamp() { return timestamp; } @@ -66,7 +82,57 @@ public class DatasetJobRecord extends HashMap<ResultSetId, ResultSetMetaData> im return status; } + @Override + public String toString() { + return resultSetMetadataMap.toString(); + } + public List<Exception> getExceptions() { return exceptions; } + + public void setResultSetMetaData(ResultSetId rsId, boolean orderedResult, int nPartitions) throws + HyracksDataException { + ResultSetMetaData rsMd = resultSetMetadataMap.get(rsId); + if (rsMd == null) { + resultSetMetadataMap.put(rsId, new ResultSetMetaData(nPartitions, orderedResult)); + } else if (rsMd.getOrderedResult() != orderedResult || rsMd.getRecords().length != nPartitions) { + throw HyracksDataException.create(ErrorCode.INCONSISTENT_RESULT_METADATA, rsId.toString()); + } + //TODO(tillw) throwing a HyracksDataException here hangs the execution tests + } + + public ResultSetMetaData getResultSetMetaData(ResultSetId rsId) { + return resultSetMetadataMap.get(rsId); + } + + public synchronized DatasetDirectoryRecord getOrCreateDirectoryRecord(ResultSetId rsId, int partition) { + DatasetDirectoryRecord[] records = getResultSetMetaData(rsId).getRecords(); + if (records[partition] == null) { + records[partition] = new DatasetDirectoryRecord(); + } + return records[partition]; + } + + public synchronized DatasetDirectoryRecord getDirectoryRecord(ResultSetId rsId, int partition) throws + HyracksDataException { + DatasetDirectoryRecord[] records = getResultSetMetaData(rsId).getRecords(); + if (records[partition] == null) { + throw new HyracksDataException("no record for partition " + partition + " of result set " + rsId); + } + return records[partition]; + } + + public synchronized void updateStatus(ResultSetId rsId) { + int successCount = 0; + DatasetDirectoryRecord[] records = getResultSetMetaData(rsId).getRecords(); + for (DatasetDirectoryRecord record : records) { + if ((record != null) && (record.getStatus() == DatasetDirectoryRecord.Status.SUCCESS)) { + successCount++; + } + } + if (successCount == records.length) { + success(); + } + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/392bbbc0/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/ResultSetMetaData.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/ResultSetMetaData.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/ResultSetMetaData.java index 2285981..8e9e3dc 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/ResultSetMetaData.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/dataset/ResultSetMetaData.java @@ -18,14 +18,15 @@ */ package org.apache.hyracks.api.dataset; -public class ResultSetMetaData { - private final boolean ordered; +import java.util.Arrays; +public class ResultSetMetaData { private final DatasetDirectoryRecord[] records; + private final boolean ordered; - public ResultSetMetaData(boolean ordered, DatasetDirectoryRecord[] records) { + ResultSetMetaData(int len, boolean ordered) { + this.records = new DatasetDirectoryRecord[len]; this.ordered = ordered; - this.records = records; } public boolean getOrderedResult() { @@ -35,4 +36,11 @@ public class ResultSetMetaData { public DatasetDirectoryRecord[] getRecords() { return records; } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("{ordered: ").append(ordered).append(", records: ").append(Arrays.toString(records)); + return sb.toString(); + } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/392bbbc0/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java index d094368..35002f8 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java @@ -49,6 +49,10 @@ public class ErrorCode { public static final int ILLEGAL_WRITE_AFTER_FLUSH_ATTEMPT = 12; public static final int DUPLICATE_IODEVICE = 13; public static final int NESTED_IODEVICES = 14; + public static final int MORE_THAN_ONE_RESULT = 15; + public static final int RESULT_FAILURE_EXCEPTION = 16; + public static final int RESULT_FAILURE_NO_EXCEPTION = 17; + public static final int INCONSISTENT_RESULT_METADATA = 18; // Compilation error codes. public static final int RULECOLLECTION_NOT_INSTANCE_OF_LIST = 10001; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/392bbbc0/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java index 0fd6923..404104d 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/HyracksDataException.java @@ -36,6 +36,11 @@ public class HyracksDataException extends HyracksException { return new HyracksDataException(ErrorCode.HYRACKS, code, ErrorCode.getErrorMessage(code), cause, params); } + public static HyracksDataException create(HyracksDataException e, String nodeId) { + return new HyracksDataException(e.getComponent(), e.getErrorCode(), e.getMessage(), e.getCause(), nodeId, e + .getParams()); + } + public HyracksDataException(String component, int errorCode, String message, Throwable cause, String nodeId, Serializable... params) { super(component, errorCode, message, cause, nodeId, params); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/392bbbc0/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobId.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobId.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobId.java index b1fa494..7969700 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobId.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/job/JobId.java @@ -26,6 +26,9 @@ import java.io.Serializable; import org.apache.hyracks.api.io.IWritable; public final class JobId implements IWritable, Serializable { + + public static final JobId INVALID = new JobId(-1l); + private static final long serialVersionUID = 1L; private long id; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/392bbbc0/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties index 72f7c65..de58f33 100644 --- a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties @@ -33,5 +33,9 @@ 12 = Invalid attempt to write to a flushed append only metadata page 13 = Duplicate IODevices are not allowed 14 = IODevices should not be nested within each other +15 = More than 1 result for job %1$s +16 = Failure producing result set %1$s for job %2$s +17 = No exception for failed result set %1$s for job %2$s +18 = Inconsistent metadata for result set %1$s" 10000 = The given rule collection %1$s is not an instance of the List class. http://git-wip-us.apache.org/repos/asf/asterixdb/blob/392bbbc0/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java index ae0f361..37c4177 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/ClusterControllerService.java @@ -325,10 +325,14 @@ public class ClusterControllerService implements IControllerService { return workQueue; } - public Executor getExecutor() { + public ExecutorService getExecutorService() { return executor; } + public Executor getExecutor() { + return getExecutorService(); + } + public CCConfig getConfig() { return ccConfig; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/392bbbc0/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java index 4d7d1c3..46a173e 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/DatasetDirectoryService.java @@ -25,6 +25,8 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutorService; +import java.util.logging.Level; +import java.util.logging.Logger; import org.apache.hyracks.api.comm.NetworkAddress; import org.apache.hyracks.api.dataset.DatasetDirectoryRecord; @@ -33,6 +35,7 @@ import org.apache.hyracks.api.dataset.DatasetJobRecord.Status; import org.apache.hyracks.api.dataset.IDatasetStateRecord; import org.apache.hyracks.api.dataset.ResultSetId; import org.apache.hyracks.api.dataset.ResultSetMetaData; +import org.apache.hyracks.api.exceptions.ErrorCode; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.exceptions.HyracksException; import org.apache.hyracks.api.job.IActivityClusterGraphGeneratorFactory; @@ -48,6 +51,9 @@ import org.apache.hyracks.control.common.work.IResultCallback; * job. */ public class DatasetDirectoryService implements IDatasetDirectoryService { + + private static final Logger LOGGER = Logger.getLogger(DatasetDirectoryService.class.getName()); + private final long resultTTL; private final long resultSweepThreshold; @@ -62,22 +68,24 @@ public class DatasetDirectoryService implements IDatasetDirectoryService { @Override public void init(ExecutorService executor) { - executor.execute(new ResultStateSweeper(this, resultTTL, resultSweepThreshold)); + executor.execute(new ResultStateSweeper(this, resultTTL, resultSweepThreshold, LOGGER)); } @Override public synchronized void notifyJobCreation(JobId jobId, IActivityClusterGraphGeneratorFactory acggf) throws HyracksException { - DatasetJobRecord djr = getDatasetJobRecord(jobId); - if (djr == null) { - djr = new DatasetJobRecord(); - jobResultLocations.put(jobId, new JobResultInfo(djr, null)); + if (LOGGER.isLoggable(Level.INFO)) { + LOGGER.info(getClass().getSimpleName() + " notified of new job " + jobId); + } + if (jobResultLocations.get(jobId) != null) { + throw HyracksDataException.create(ErrorCode.MORE_THAN_ONE_RESULT, jobId); } + jobResultLocations.put(jobId, new JobResultInfo(new DatasetJobRecord(), null)); } @Override public void notifyJobStart(JobId jobId) throws HyracksException { - // Auto-generated method stub + jobResultLocations.get(jobId).getRecord().start(); } @Override @@ -87,35 +95,36 @@ public class DatasetDirectoryService implements IDatasetDirectoryService { private DatasetJobRecord getDatasetJobRecord(JobId jobId) { final JobResultInfo jri = jobResultLocations.get(jobId); - return jri == null ? null : jri.record; + return jri == null ? null : jri.getRecord(); } - @Override - public synchronized void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult, - boolean emptyResult, int partition, int nPartitions, NetworkAddress networkAddress) { - DatasetJobRecord djr = getDatasetJobRecord(jobId); - - ResultSetMetaData resultSetMetaData = djr.get(rsId); - if (resultSetMetaData == null) { - resultSetMetaData = new ResultSetMetaData(orderedResult, new DatasetDirectoryRecord[nPartitions]); - djr.put(rsId, resultSetMetaData); - } - - DatasetDirectoryRecord[] records = resultSetMetaData.getRecords(); - if (records[partition] == null) { - records[partition] = new DatasetDirectoryRecord(); + private DatasetJobRecord getNonNullDatasetJobRecord(JobId jobId) { + final DatasetJobRecord djr = getDatasetJobRecord(jobId); + if (djr == null) { + throw new NullPointerException(); } - records[partition].setNetworkAddress(networkAddress); - records[partition].setEmpty(emptyResult); - records[partition].start(); + return djr; + } - Waiters waiters = jobResultLocations.get(jobId).waiters; - Waiter waiter = waiters != null ? waiters.get(rsId) : null; + @Override + public synchronized void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult, + boolean emptyResult, int partition, int nPartitions, NetworkAddress networkAddress) throws + HyracksDataException { + DatasetJobRecord djr = getNonNullDatasetJobRecord(jobId); + djr.setResultSetMetaData(rsId, orderedResult, nPartitions); + DatasetDirectoryRecord record = djr.getOrCreateDirectoryRecord(rsId, partition); + + record.setNetworkAddress(networkAddress); + record.setEmpty(emptyResult); + record.start(); + + final JobResultInfo jobResultInfo = jobResultLocations.get(jobId); + Waiter waiter = jobResultInfo.getWaiter(rsId); if (waiter != null) { try { DatasetDirectoryRecord[] updatedRecords = updatedRecords(jobId, rsId, waiter.knownRecords); if (updatedRecords != null) { - waiters.remove(rsId); + jobResultInfo.removeWaiter(rsId); waiter.callback.setValue(updatedRecords); } } catch (Exception e) { @@ -126,51 +135,28 @@ public class DatasetDirectoryService implements IDatasetDirectoryService { } @Override - public synchronized void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition) { - int successCount = 0; - - DatasetJobRecord djr = getDatasetJobRecord(jobId); - ResultSetMetaData resultSetMetaData = djr.get(rsId); - DatasetDirectoryRecord[] records = resultSetMetaData.getRecords(); - records[partition].writeEOS(); - - for (DatasetDirectoryRecord record : records) { - if ((record != null) && (record.getStatus() == DatasetDirectoryRecord.Status.SUCCESS)) { - successCount++; - } - } - if (successCount == records.length) { - djr.success(); - } + public synchronized void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition) + throws HyracksDataException { + DatasetJobRecord djr = getNonNullDatasetJobRecord(jobId); + djr.getDirectoryRecord(rsId, partition).writeEOS(); + djr.updateStatus(rsId); notifyAll(); } @Override public synchronized void reportResultPartitionFailure(JobId jobId, ResultSetId rsId, int partition) { - DatasetJobRecord djr = getDatasetJobRecord(jobId); - if (djr != null) { - djr.fail(); - } - final Waiters waiters = jobResultLocations.get(jobId).waiters; - if (waiters != null) { - waiters.get(rsId).callback.setException(new Exception()); - waiters.remove(rsId); - } + DatasetJobRecord djr = getNonNullDatasetJobRecord(jobId); + djr.fail(rsId, partition); + jobResultLocations.get(jobId).setException(new Exception()); notifyAll(); } @Override public synchronized void reportJobFailure(JobId jobId, List<Exception> exceptions) { - DatasetJobRecord djr = getDatasetJobRecord(jobId); - if (djr != null) { - djr.fail(exceptions); - } - final Waiters waiters = jobResultLocations.get(jobId).waiters; - if (waiters != null) { - for (ResultSetId rsId : waiters.keySet()) { - waiters.remove(rsId).callback.setException(exceptions.get(0)); - } - } + DatasetJobRecord djr = getNonNullDatasetJobRecord(jobId); + djr.fail(exceptions); + // TODO(tillw) throwing an NPE here hangs the system, why? + jobResultLocations.get(jobId).setException(exceptions.isEmpty() ? null : exceptions.get(0)); notifyAll(); } @@ -184,7 +170,6 @@ public class DatasetDirectoryService implements IDatasetDirectoryService { throw new HyracksDataException(e); } } - return djr.getStatus(); } @@ -195,7 +180,7 @@ public class DatasetDirectoryService implements IDatasetDirectoryService { @Override public IDatasetStateRecord getState(JobId jobId) { - return jobResultLocations.get(jobId).record; + return getDatasetJobRecord(jobId); } @Override @@ -210,20 +195,7 @@ public class DatasetDirectoryService implements IDatasetDirectoryService { throws HyracksDataException { DatasetDirectoryRecord[] updatedRecords = updatedRecords(jobId, rsId, knownRecords); if (updatedRecords == null) { - JobResultInfo jri = jobResultLocations.get(jobId); - Waiters waiters; - if (jri == null) { - waiters = new Waiters(); - jri = new JobResultInfo(null, waiters); - jobResultLocations.put(jobId, jri); - } else { - waiters = jri.waiters; - if (waiters == null) { - waiters = new Waiters(); - jri.waiters = waiters; - } - } - waiters.put(rsId, new Waiter(knownRecords, callback)); + jobResultLocations.get(jobId).addWaiter(rsId, knownRecords, callback); } else { callback.setValue(updatedRecords); } @@ -248,26 +220,25 @@ public class DatasetDirectoryService implements IDatasetDirectoryService { */ private DatasetDirectoryRecord[] updatedRecords(JobId jobId, ResultSetId rsId, DatasetDirectoryRecord[] knownRecords) throws HyracksDataException { - DatasetJobRecord djr = getDatasetJobRecord(jobId); - - if (djr == null) { - throw new HyracksDataException("Requested JobId " + jobId + " doesn't exist"); - } + DatasetJobRecord djr = getNonNullDatasetJobRecord(jobId); if (djr.getStatus() == Status.FAILED) { List<Exception> caughtExceptions = djr.getExceptions(); - if (caughtExceptions == null) { - throw new HyracksDataException("Job failed."); + if (caughtExceptions != null && !caughtExceptions.isEmpty()) { + final Exception cause = caughtExceptions.get(caughtExceptions.size() - 1); + if (cause instanceof HyracksDataException) { + throw (HyracksDataException) cause; + } + throw HyracksDataException.create(ErrorCode.RESULT_FAILURE_EXCEPTION, cause, rsId, jobId); } else { - throw new HyracksDataException(caughtExceptions.get(caughtExceptions.size() - 1)); + throw HyracksDataException.create(ErrorCode.RESULT_FAILURE_NO_EXCEPTION, rsId, jobId); } } - ResultSetMetaData resultSetMetaData = djr.get(rsId); - if (resultSetMetaData == null || resultSetMetaData.getRecords() == null) { + final ResultSetMetaData resultSetMetaData = djr.getResultSetMetaData(rsId); + if (resultSetMetaData == null) { return null; } - DatasetDirectoryRecord[] records = resultSetMetaData.getRecords(); return Arrays.equals(records, knownRecords) ? null : records; @@ -275,13 +246,42 @@ public class DatasetDirectoryService implements IDatasetDirectoryService { } class JobResultInfo { + + private DatasetJobRecord record; + private Waiters waiters; + JobResultInfo(DatasetJobRecord record, Waiters waiters) { this.record = record; this.waiters = waiters; } - DatasetJobRecord record; - Waiters waiters; + DatasetJobRecord getRecord() { + return record; + } + + void addWaiter(ResultSetId rsId, DatasetDirectoryRecord[] knownRecords, + IResultCallback<DatasetDirectoryRecord[]> callback) { + if (waiters == null) { + waiters = new Waiters(); + } + waiters.put(rsId, new Waiter(knownRecords, callback)); + } + + Waiter removeWaiter(ResultSetId rsId) { + return waiters.remove(rsId); + } + + Waiter getWaiter(ResultSetId rsId) { + return waiters != null ? waiters.get(rsId) : null; + } + + void setException(Exception exception) { + if (waiters != null) { + for (ResultSetId rsId : waiters.keySet()) { + waiters.remove(rsId).callback.setException(exception); + } + } + } } class Waiters extends HashMap<ResultSetId, Waiter> { @@ -289,11 +289,11 @@ class Waiters extends HashMap<ResultSetId, Waiter> { } class Waiter { + DatasetDirectoryRecord[] knownRecords; + IResultCallback<DatasetDirectoryRecord[]> callback; + Waiter(DatasetDirectoryRecord[] knownRecords, IResultCallback<DatasetDirectoryRecord[]> callback) { this.knownRecords = knownRecords; this.callback = callback; } - - DatasetDirectoryRecord[] knownRecords; - IResultCallback<DatasetDirectoryRecord[]> callback; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/392bbbc0/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/IDatasetDirectoryService.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/IDatasetDirectoryService.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/IDatasetDirectoryService.java index 9e4e03e..663a53a 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/IDatasetDirectoryService.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/dataset/IDatasetDirectoryService.java @@ -35,9 +35,11 @@ public interface IDatasetDirectoryService extends IJobLifecycleListener, IDatase public void init(ExecutorService executor); public void registerResultPartitionLocation(JobId jobId, ResultSetId rsId, boolean orderedResult, - boolean emptyResult, int partition, int nPartitions, NetworkAddress networkAddress); + boolean emptyResult, int partition, int nPartitions, NetworkAddress networkAddress) + throws HyracksDataException; - public void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition); + public void reportResultPartitionWriteCompletion(JobId jobId, ResultSetId rsId, int partition) + throws HyracksDataException; public void reportResultPartitionFailure(JobId jobId, ResultSetId rsId, int partition); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/392bbbc0/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java index 4e4732d..f51dd06 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/RegisterResultPartitionLocationWork.java @@ -20,6 +20,7 @@ package org.apache.hyracks.control.cc.work; import org.apache.hyracks.api.comm.NetworkAddress; import org.apache.hyracks.api.dataset.ResultSetId; +import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.control.cc.ClusterControllerService; import org.apache.hyracks.control.common.work.AbstractWork; @@ -42,7 +43,8 @@ public class RegisterResultPartitionLocationWork extends AbstractWork { private final NetworkAddress networkAddress; public RegisterResultPartitionLocationWork(ClusterControllerService ccs, JobId jobId, ResultSetId rsId, - boolean orderedResult, boolean emptyResult, int partition, int nPartitions, NetworkAddress networkAddress) { + boolean orderedResult, boolean emptyResult, int partition, int nPartitions, NetworkAddress + networkAddress) { this.ccs = ccs; this.jobId = jobId; this.rsId = rsId; @@ -55,8 +57,13 @@ public class RegisterResultPartitionLocationWork extends AbstractWork { @Override public void run() { - ccs.getDatasetDirectoryService().registerResultPartitionLocation(jobId, rsId, orderedResult, emptyResult, - partition, nPartitions, networkAddress); + try { + ccs.getDatasetDirectoryService() + .registerResultPartitionLocation(jobId, rsId, orderedResult, emptyResult, partition, nPartitions, + networkAddress); + } catch (HyracksDataException e) { + throw new RuntimeException(e); + } } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/392bbbc0/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ReportResultPartitionWriteCompletionWork.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ReportResultPartitionWriteCompletionWork.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ReportResultPartitionWriteCompletionWork.java index ffae76a..d63bc8a 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ReportResultPartitionWriteCompletionWork.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/work/ReportResultPartitionWriteCompletionWork.java @@ -19,6 +19,7 @@ package org.apache.hyracks.control.cc.work; import org.apache.hyracks.api.dataset.ResultSetId; +import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.control.cc.ClusterControllerService; import org.apache.hyracks.control.common.work.AbstractWork; @@ -42,7 +43,11 @@ public class ReportResultPartitionWriteCompletionWork extends AbstractWork { @Override public void run() { - ccs.getDatasetDirectoryService().reportResultPartitionWriteCompletion(jobId, rsId, partition); + try { + ccs.getDatasetDirectoryService().reportResultPartitionWriteCompletion(jobId, rsId, partition); + } catch (HyracksDataException e) { + throw new RuntimeException(e); + } } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/392bbbc0/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/dataset/ResultStateSweeper.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/dataset/ResultStateSweeper.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/dataset/ResultStateSweeper.java index 150875b..67b87f5 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/dataset/ResultStateSweeper.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/dataset/ResultStateSweeper.java @@ -31,7 +31,6 @@ import org.apache.hyracks.api.job.JobId; * Sweeper to clean up the stale result distribution files and result states. */ public class ResultStateSweeper implements Runnable { - private static final Logger LOGGER = Logger.getLogger(ResultStateSweeper.class.getName()); private final IDatasetManager datasetManager; @@ -39,12 +38,16 @@ public class ResultStateSweeper implements Runnable { private final long resultSweepThreshold; + private final Logger logger; + private final List<JobId> toBeCollected; - public ResultStateSweeper(IDatasetManager datasetManager, long resultTTL, long resultSweepThreshold) { + public ResultStateSweeper(IDatasetManager datasetManager, long resultTTL, long resultSweepThreshold, + Logger logger) { this.datasetManager = datasetManager; this.resultTTL = resultTTL; this.resultSweepThreshold = resultSweepThreshold; + this.logger = logger; toBeCollected = new ArrayList<JobId>(); } @@ -56,11 +59,10 @@ public class ResultStateSweeper implements Runnable { Thread.sleep(resultSweepThreshold); sweep(); } catch (InterruptedException e) { - LOGGER.severe("Result cleaner thread interrupted, shutting down."); + logger.log(Level.SEVERE, "Result cleaner thread interrupted, shutting down.", e); break; // the interrupt was explicit from another thread. This thread should shut down... } } - } private void sweep() { @@ -75,8 +77,8 @@ public class ResultStateSweeper implements Runnable { datasetManager.deinitState(jobId); } } - if (LOGGER.isLoggable(Level.FINER)) { - LOGGER.finer("Result state cleanup instance successfully completed."); + if (logger.isLoggable(Level.FINER)) { + logger.finer("Result state cleanup instance successfully completed."); } } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/392bbbc0/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/utils/ExceptionUtils.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/utils/ExceptionUtils.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/utils/ExceptionUtils.java index 73c680f..3bc549e 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/utils/ExceptionUtils.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-common/src/main/java/org/apache/hyracks/control/common/utils/ExceptionUtils.java @@ -58,8 +58,12 @@ public class ExceptionUtils { public static void setNodeIds(Collection<Exception> exceptions, String nodeId) { List<Exception> newExceptions = new ArrayList<>(); for (Exception e : exceptions) { - newExceptions.add( - new HyracksDataException(ErrorCode.HYRACKS, ErrorCode.FAILURE_ON_NODE, e.getMessage(), e, nodeId)); + if (e instanceof HyracksDataException) { + newExceptions.add(HyracksDataException.create((HyracksDataException) e, nodeId)); + } else { + newExceptions.add(new HyracksDataException(ErrorCode.HYRACKS, ErrorCode.FAILURE_ON_NODE, e.getMessage(), + e, nodeId)); + } } exceptions.clear(); exceptions.addAll(newExceptions);
