Repository: asterixdb-bad Updated Branches: refs/heads/master 40b70a18a -> 51819b77a
[ASTERIXDB-2314][HYR] Dataset in class names in Hyracks Change-Id: I333fa410df5efe7da9d4f0e9b7143f9f6928b88b Project: http://git-wip-us.apache.org/repos/asf/asterixdb-bad/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb-bad/commit/51819b77 Tree: http://git-wip-us.apache.org/repos/asf/asterixdb-bad/tree/51819b77 Diff: http://git-wip-us.apache.org/repos/asf/asterixdb-bad/diff/51819b77 Branch: refs/heads/master Commit: 51819b77a6068d92eed284624ff6550530b93d05 Parents: 40b70a1 Author: Till Westmann <[email protected]> Authored: Sat Jul 28 09:03:20 2018 -0700 Committer: Till Westmann <[email protected]> Committed: Sat Jul 28 09:03:20 2018 -0700 ---------------------------------------------------------------------- .../java/org/apache/asterix/bad/BADJobService.java | 16 ++++++++-------- .../lang/statement/ChannelSubscribeStatement.java | 14 +++++++------- .../bad/lang/statement/CreateChannelStatement.java | 11 ++++++----- .../lang/statement/CreateProcedureStatement.java | 2 +- .../lang/statement/ExecuteProcedureStatement.java | 2 +- .../bad/recovery/BADGlobalRecoveryManager.java | 4 ++-- 6 files changed, 25 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/51819b77/asterix-bad/src/main/java/org/apache/asterix/bad/BADJobService.java ---------------------------------------------------------------------- diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/BADJobService.java b/asterix-bad/src/main/java/org/apache/asterix/bad/BADJobService.java index c48ec54..5ec4852 100644 --- a/asterix-bad/src/main/java/org/apache/asterix/bad/BADJobService.java +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/BADJobService.java @@ -51,11 +51,11 @@ import org.apache.asterix.metadata.declared.MetadataProvider; import org.apache.asterix.translator.IRequestParameters; import org.apache.asterix.translator.IStatementExecutor; import org.apache.hyracks.api.client.IHyracksClientConnection; -import org.apache.hyracks.api.dataset.IHyracksDataset; -import org.apache.hyracks.api.dataset.ResultSetId; import org.apache.hyracks.api.job.DeployedJobSpecId; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.job.JobSpecification; +import org.apache.hyracks.api.result.IResultSet; +import org.apache.hyracks.api.result.ResultSetId; /** * Provides functionality for channel jobs @@ -122,7 +122,7 @@ public class BADJobService { } public static long runDeployedJobSpec(DeployedJobSpecId distributedId, IHyracksClientConnection hcc, - IHyracksDataset hdc, Map<byte[], byte[]> jobParameters, EntityId entityId, ITxnIdFactory txnIdFactory, + IResultSet resultSet, Map<byte[], byte[]> jobParameters, EntityId entityId, ITxnIdFactory txnIdFactory, ICcApplicationContext appCtx, DeployedJobSpecEventListener listener, QueryTranslator statementExecutor) throws Exception { listener.waitWhileAtState(ActivityState.SUSPENDED); @@ -138,7 +138,7 @@ public class BADJobService { long executionMilliseconds = Instant.now().toEpochMilli() - startTime; if (listener.getType() == DeployedJobSpecEventListener.PrecompiledType.QUERY) { - ResultReader resultReader = new ResultReader(hdc, jobId, new ResultSetId(0)); + ResultReader resultReader = new ResultReader(resultSet, jobId, new ResultSetId(0)); ResultUtil.printResults(appCtx, resultReader, statementExecutor.getSessionOutput(), new IStatementExecutor.Stats(), null); @@ -235,7 +235,7 @@ public class BADJobService { //Procedures metadataProvider.setResultSetId(new ResultSetId(0)); IStatementExecutor.ResultDelivery resultDelivery = requestParameters.getResultProperties().getDelivery(); - IHyracksDataset hdc = requestParameters.getHyracksDataset(); + IResultSet hdc = requestParameters.getResultSet(); IStatementExecutor.Stats stats = requestParameters.getStats(); boolean resultsAsync = resultDelivery == IStatementExecutor.ResultDelivery.ASYNC || resultDelivery == IStatementExecutor.ResultDelivery.DEFERRED; @@ -272,12 +272,12 @@ public class BADJobService { } private static JobSpecification compileProcedureJob(IStatementExecutor statementExecutor, - MetadataProvider metadataProvider, IHyracksClientConnection hcc, IHyracksDataset hdc, + MetadataProvider metadataProvider, IHyracksClientConnection hcc, IResultSet resultSet, IStatementExecutor.Stats stats, Statement procedureStatement) throws Exception { if (procedureStatement.getKind() == Statement.Kind.INSERT) { return ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider, - procedureStatement, hcc, hdc, IStatementExecutor.ResultDelivery.ASYNC, null, stats, true, null, - null, null); + procedureStatement, hcc, resultSet, IStatementExecutor.ResultDelivery.ASYNC, null, stats, true, + null, null, null); } else if (procedureStatement.getKind() == Statement.Kind.QUERY) { return compileQueryJob(statementExecutor, metadataProvider, hcc, (Query) procedureStatement); } else { http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/51819b77/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java ---------------------------------------------------------------------- diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java index cff1eaa..f6f17f3 100644 --- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ChannelSubscribeStatement.java @@ -56,9 +56,9 @@ import org.apache.asterix.translator.IStatementExecutorContext; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; import org.apache.hyracks.api.client.IHyracksClientConnection; -import org.apache.hyracks.api.dataset.IHyracksDataset; -import org.apache.hyracks.api.dataset.ResultSetId; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.result.IResultSet; +import org.apache.hyracks.api.result.ResultSetId; public class ChannelSubscribeStatement extends ExtensionStatement { @@ -184,7 +184,7 @@ public class ChannelSubscribeStatement extends ExtensionStatement { tempMdProvider.getConfig().putAll(metadataProvider.getConfig()); final ResultDelivery resultDelivery = requestParameters.getResultProperties().getDelivery(); - final IHyracksDataset hdc = requestParameters.getHyracksDataset(); + final IResultSet resultSet = requestParameters.getResultSet(); final Stats stats = requestParameters.getStats(); if (subscriptionId == null) { //To create a new subscription @@ -207,14 +207,14 @@ public class ChannelSubscribeStatement extends ExtensionStatement { InsertStatement insert = new InsertStatement(new Identifier(dataverse), new Identifier(subscriptionsDatasetName), subscriptionTuple, varCounter, resultVar, accessor); - ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(tempMdProvider, insert, hcc, hdc, - resultDelivery, null, stats, false, null, null, null); + ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(tempMdProvider, insert, hcc, + resultSet, resultDelivery, null, stats, false, null, null, null); } else { //To update an existing subscription UpsertStatement upsert = new UpsertStatement(new Identifier(dataverse), new Identifier(subscriptionsDatasetName), subscriptionTuple, varCounter, null, null); - ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(tempMdProvider, upsert, hcc, hdc, - resultDelivery, null, stats, false, null, null, null); + ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(tempMdProvider, upsert, hcc, + resultSet, resultDelivery, null, stats, false, null, null, null); } MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/51819b77/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java ---------------------------------------------------------------------- diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java index edadeb6..0ddb1c3 100644 --- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateChannelStatement.java @@ -72,9 +72,9 @@ import org.apache.asterix.translator.IStatementExecutorContext; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.common.utils.Pair; import org.apache.hyracks.api.client.IHyracksClientConnection; -import org.apache.hyracks.api.dataset.IHyracksDataset; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.job.JobSpecification; +import org.apache.hyracks.api.result.IResultSet; import org.apache.hyracks.dataflow.common.data.parsers.IValueParser; public class CreateChannelStatement extends ExtensionStatement { @@ -215,7 +215,7 @@ public class CreateChannelStatement extends ExtensionStatement { } private JobSpecification createChannelJob(IStatementExecutor statementExecutor, MetadataProvider metadataProvider, - IHyracksClientConnection hcc, IHyracksDataset hdc, Stats stats) throws Exception { + IHyracksClientConnection hcc, IResultSet resultSet, Stats stats) throws Exception { StringBuilder builder = new StringBuilder(); builder.append("SET inline_with \"false\";\n"); if (!push) { @@ -253,7 +253,7 @@ public class CreateChannelStatement extends ExtensionStatement { (Query) fStatements.get(1)); } return ((QueryTranslator) statementExecutor).handleInsertUpsertStatement(metadataProvider, fStatements.get(1), - hcc, hdc, ResultDelivery.ASYNC, null, stats, true, null, null, null); + hcc, resultSet, ResultDelivery.ASYNC, null, stats, true, null, null, null); } @Override @@ -306,13 +306,14 @@ public class CreateChannelStatement extends ExtensionStatement { MetadataProvider tempMdProvider = new MetadataProvider(metadataProvider.getApplicationContext(), metadataProvider.getDefaultDataverse()); tempMdProvider.getConfig().putAll(metadataProvider.getConfig()); - final IHyracksDataset hdc = requestContext.getHyracksDataset(); + final IResultSet resultSet = requestContext.getResultSet(); final Stats stats = requestContext.getStats(); //Create Channel Datasets createDatasets(statementExecutor, tempMdProvider, hcc); tempMdProvider.getLocks().reset(); //Create Channel Internal Job - JobSpecification channeljobSpec = createChannelJob(statementExecutor, tempMdProvider, hcc, hdc, stats); + JobSpecification channeljobSpec = + createChannelJob(statementExecutor, tempMdProvider, hcc, resultSet, stats); // Now we subscribe if (listener == null) { http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/51819b77/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java ---------------------------------------------------------------------- diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java index 6459b4c..ce8f1d2 100644 --- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/CreateProcedureStatement.java @@ -73,10 +73,10 @@ import org.apache.asterix.translator.IStatementExecutorContext; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.common.utils.Pair; import org.apache.hyracks.api.client.IHyracksClientConnection; -import org.apache.hyracks.api.dataset.ResultSetId; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.job.DeployedJobSpecId; import org.apache.hyracks.api.job.JobSpecification; +import org.apache.hyracks.api.result.ResultSetId; import org.apache.hyracks.dataflow.common.data.parsers.IValueParser; public class CreateProcedureStatement extends ExtensionStatement { http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/51819b77/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java ---------------------------------------------------------------------- diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java index 69f413e..2c9d361 100644 --- a/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/lang/statement/ExecuteProcedureStatement.java @@ -119,7 +119,7 @@ public class ExecuteProcedureStatement extends ExtensionStatement { Map<byte[], byte[]> contextRuntimeVarMap = createParameterMap(procedure); DeployedJobSpecId deployedJobSpecId = listener.getDeployedJobSpecId(); if (procedure.getDuration().equals("")) { - BADJobService.runDeployedJobSpec(deployedJobSpecId, hcc, requestParameters.getHyracksDataset(), + BADJobService.runDeployedJobSpec(deployedJobSpecId, hcc, requestParameters.getResultSet(), contextRuntimeVarMap, entityId, metadataProvider.getTxnIdFactory(), appCtx, listener, (QueryTranslator) statementExecutor); http://git-wip-us.apache.org/repos/asf/asterixdb-bad/blob/51819b77/asterix-bad/src/main/java/org/apache/asterix/bad/recovery/BADGlobalRecoveryManager.java ---------------------------------------------------------------------- diff --git a/asterix-bad/src/main/java/org/apache/asterix/bad/recovery/BADGlobalRecoveryManager.java b/asterix-bad/src/main/java/org/apache/asterix/bad/recovery/BADGlobalRecoveryManager.java index f358986..d2a6613 100644 --- a/asterix-bad/src/main/java/org/apache/asterix/bad/recovery/BADGlobalRecoveryManager.java +++ b/asterix-bad/src/main/java/org/apache/asterix/bad/recovery/BADGlobalRecoveryManager.java @@ -56,7 +56,7 @@ import org.apache.hyracks.algebricks.runtime.writers.PrinterBasedWriterFactory; import org.apache.hyracks.api.application.ICCServiceContext; import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.client.dataset.HyracksDataset; +import org.apache.hyracks.client.result.ResultSet; import org.apache.hyracks.control.common.utils.HyracksThreadFactory; public class BADGlobalRecoveryManager extends GlobalRecoveryManager { @@ -146,7 +146,7 @@ public class BADGlobalRecoveryManager extends GlobalRecoveryManager { activeEventHandler.registerListener(listener); BADJobService.redeployJobSpec(entityId, procedure.getBody(), metadataProvider, badStatementExecutor, hcc, new RequestParameters( - new HyracksDataset(hcc, appCtx.getCompilerProperties().getFrameSize(), + new ResultSet(hcc, appCtx.getCompilerProperties().getFrameSize(), ResultReader.NUM_READERS), new ResultProperties(IStatementExecutor.ResultDelivery.IMMEDIATE), new IStatementExecutor.Stats(), null, null, null, null, true),
