This is an automated email from the ASF dual-hosted git repository. mblow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit cdc36ff0b8fca540d2099d6700c3b647a193d7a6 Author: Dmitry Lychagin <[email protected]> AuthorDate: Fri Jun 10 12:35:22 2022 -0700 [NO ISSUE][COMP] Support for running queries during optimization - user model changes: no - storage format changes: no - interface changes: no Details: - Provide support for running helper queries during optimization - Introduce a new logical ruleset for running sampling queries Change-Id: I457063fef269ae00947169d663d828488c67c2ee Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/16143 Reviewed-by: Ali Alsuliman <[email protected]> Integration-Tests: Jenkins <[email protected]> Tested-by: Jenkins <[email protected]> Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17345 Reviewed-by: Michael Blow <[email protected]> Tested-by: Michael Blow <[email protected]> --- asterixdb/asterix-algebra/pom.xml | 4 + .../compiler/provider/DefaultRuleSetFactory.java | 23 +++- .../asterix/compiler/provider/IRuleSetFactory.java | 20 ++-- .../asterix/optimizer/base/AnalysisUtil.java | 123 ++++++++++++++++++- .../optimizer/base/AsterixOptimizationContext.java | 18 ++- .../optimizer/rules/ConstantFoldingRule.java | 9 +- .../apache/asterix/translator}/ResultMetadata.java | 3 +- .../apache/asterix/api/common/APIFramework.java | 57 +++++---- .../api/http/server/QueryResultApiServlet.java | 2 +- .../asterix/app/result/JobResultCallback.java | 2 +- .../app/result/fields/SignaturePrinter.java | 2 +- .../asterix/app/translator/QueryTranslator.java | 30 +++-- .../common/exceptions/NoOpWarningCollector.java | 21 +--- .../metadata/declared/MetadataProvider.java | 30 ++--- .../asterix/metadata/feeds/FeedMetadataUtil.java | 2 +- .../metadata/utils/ExternalIndexingOperations.java | 2 +- .../algebricks/algebricks-compiler/pom.xml | 5 + .../api/AbstractCompilerFactoryBuilder.java | 39 ++++++- .../api/HeuristicCompilerFactoryBuilder.java | 130 +++++++++++++++------ .../algebricks/compiler/api/ICompilerFactory.java | 4 + .../algebricks/core/algebra/base/Counter.java | 8 ++ .../core/algebra/base/IOptimizationContext.java | 7 ++ .../core/algebra/metadata/IMetadataProvider.java | 9 +- .../physical/DistributeResultPOperator.java | 5 +- .../operators/physical/SinkWritePOperator.java | 2 +- .../algebricks/core/jobgen/impl/JobGenContext.java | 15 +++ .../base/AlgebricksOptimizationContext.java | 103 +++++++++++----- .../rewriter/base/IOptimizationContextFactory.java | 4 +- .../core/rewriter/base/IRuleSetKind.java} | 8 +- .../data/IResultSerializerFactoryProvider.java | 6 +- .../rewriter/rules/PopulateResultMetadataRule.java | 2 +- .../ResultSerializerFactoryProvider.java | 2 +- .../writers/SerializedDataWriterFactory.java | 30 +++-- .../api}/exceptions/NoOpWarningCollector.java | 5 +- .../org/apache/hyracks/test/support/TestUtils.java | 19 +-- 35 files changed, 528 insertions(+), 223 deletions(-) diff --git a/asterixdb/asterix-algebra/pom.xml b/asterixdb/asterix-algebra/pom.xml index 7a32f42d42..f542af3145 100644 --- a/asterixdb/asterix-algebra/pom.xml +++ b/asterixdb/asterix-algebra/pom.xml @@ -201,6 +201,10 @@ <groupId>org.apache.hyracks</groupId> <artifactId>hyracks-api</artifactId> </dependency> + <dependency> + <groupId>org.apache.hyracks</groupId> + <artifactId>hyracks-client</artifactId> + </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-api</artifactId> diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/compiler/provider/DefaultRuleSetFactory.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/compiler/provider/DefaultRuleSetFactory.java index ba42036ac5..76404ffde1 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/compiler/provider/DefaultRuleSetFactory.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/compiler/provider/DefaultRuleSetFactory.java @@ -23,22 +23,32 @@ import java.util.List; import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.optimizer.base.RuleCollections; -import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.common.utils.Pair; import org.apache.hyracks.algebricks.compiler.rewriter.rulecontrollers.SequentialFirstRuleCheckFixpointRuleController; import org.apache.hyracks.algebricks.compiler.rewriter.rulecontrollers.SequentialFixpointRuleController; import org.apache.hyracks.algebricks.compiler.rewriter.rulecontrollers.SequentialOnceRuleController; import org.apache.hyracks.algebricks.core.rewriter.base.AbstractRuleController; import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule; +import org.apache.hyracks.algebricks.core.rewriter.base.IRuleSetKind; public class DefaultRuleSetFactory implements IRuleSetFactory { @Override public List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> getLogicalRewrites( - ICcApplicationContext appCtx) throws AlgebricksException { + ICcApplicationContext appCtx) { return buildLogical(appCtx); } + @Override + public List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> getLogicalRewrites(IRuleSetKind ruleSetKind, + ICcApplicationContext appCtx) { + if (ruleSetKind == RuleSetKind.SAMPLING) { + return buildLogicalSampling(); + } else { + throw new IllegalArgumentException(String.valueOf(ruleSetKind)); + } + } + @Override public List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> getPhysicalRewrites( ICcApplicationContext appCtx) { @@ -76,6 +86,14 @@ public class DefaultRuleSetFactory implements IRuleSetFactory { return defaultLogicalRewrites; } + public static List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> buildLogicalSampling() { + List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> logicalRewrites = new ArrayList<>(); + SequentialFixpointRuleController seqCtrlNoDfs = new SequentialFixpointRuleController(false); + logicalRewrites.add(new Pair<>(seqCtrlNoDfs, RuleCollections.buildConsolidationRuleCollection())); + logicalRewrites.add(new Pair<>(seqCtrlNoDfs, RuleCollections.buildPlanCleanupRuleCollection())); + return logicalRewrites; + } + public static List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> buildPhysical( ICcApplicationContext appCtx) { List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> defaultPhysicalRewrites = new ArrayList<>(); @@ -88,5 +106,4 @@ public class DefaultRuleSetFactory implements IRuleSetFactory { defaultPhysicalRewrites.add(new Pair<>(seqOnceCtrl, RuleCollections.prepareForJobGenRuleCollection())); return defaultPhysicalRewrites; } - } diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/compiler/provider/IRuleSetFactory.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/compiler/provider/IRuleSetFactory.java index 2300e4a7ec..c643d21450 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/compiler/provider/IRuleSetFactory.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/compiler/provider/IRuleSetFactory.java @@ -21,24 +21,30 @@ package org.apache.asterix.compiler.provider; import java.util.List; import org.apache.asterix.common.dataflow.ICcApplicationContext; -import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.common.utils.Pair; import org.apache.hyracks.algebricks.core.rewriter.base.AbstractRuleController; import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule; +import org.apache.hyracks.algebricks.core.rewriter.base.IRuleSetKind; public interface IRuleSetFactory { + enum RuleSetKind implements IRuleSetKind { + SAMPLING + } + /** * @return the logical rewrites - * @throws AlgebricksException */ - public List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> getLogicalRewrites( - ICcApplicationContext appCtx) throws AlgebricksException; + List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> getLogicalRewrites(ICcApplicationContext appCtx); /** - * @return the physical rewrites + * @return the logical rewrites of the specified kind */ - public List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> getPhysicalRewrites( - ICcApplicationContext appCtx) throws AlgebricksException; + List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> getLogicalRewrites(IRuleSetKind ruleSetKind, + ICcApplicationContext appCtx); + /** + * @return the physical rewrites + */ + List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> getPhysicalRewrites(ICcApplicationContext appCtx); } diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java index 8b5c8bc0b8..b6f79d9924 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AnalysisUtil.java @@ -18,32 +18,70 @@ */ package org.apache.asterix.optimizer.base; +import java.io.DataInputStream; +import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; +import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.exceptions.CompilationException; import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.common.metadata.DataverseName; +import org.apache.asterix.common.transactions.TxnId; +import org.apache.asterix.common.utils.JobUtils; +import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider; import org.apache.asterix.metadata.declared.DataSourceId; +import org.apache.asterix.metadata.declared.MetadataProvider; +import org.apache.asterix.metadata.declared.ResultSetDataSink; +import org.apache.asterix.metadata.declared.ResultSetSinkId; +import org.apache.asterix.om.base.IAObject; import org.apache.asterix.om.functions.BuiltinFunctions; import org.apache.asterix.optimizer.rules.am.AccessMethodUtils; +import org.apache.asterix.runtime.job.listener.JobEventListenerFactory; +import org.apache.asterix.translator.ResultMetadata; +import org.apache.asterix.translator.SessionConfig; import org.apache.commons.lang3.mutable.Mutable; +import org.apache.commons.lang3.mutable.MutableObject; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.common.utils.Pair; +import org.apache.hyracks.algebricks.compiler.api.ICompiler; +import org.apache.hyracks.algebricks.compiler.api.ICompilerFactory; import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression; import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan; +import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext; import org.apache.hyracks.algebricks.core.algebra.base.LogicalExpressionTag; import org.apache.hyracks.algebricks.core.algebra.base.LogicalOperatorTag; +import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable; import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression; +import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment; +import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression; import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractDataSourceOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.AbstractLogicalOperator; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.DistributeResultOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.OrderOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.UnnestMapOperator; import org.apache.hyracks.algebricks.core.algebra.operators.logical.WindowOperator; +import org.apache.hyracks.algebricks.core.algebra.plan.ALogicalPlanImpl; +import org.apache.hyracks.algebricks.core.rewriter.base.IRuleSetKind; +import org.apache.hyracks.api.comm.IFrame; +import org.apache.hyracks.api.comm.VSizeFrame; +import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; +import org.apache.hyracks.api.job.JobId; +import org.apache.hyracks.api.job.JobSpecification; +import org.apache.hyracks.api.result.IResultSetReader; +import org.apache.hyracks.api.result.ResultSetId; +import org.apache.hyracks.control.nc.resources.memory.FrameManager; +import org.apache.hyracks.data.std.util.ByteArrayAccessibleInputStream; +import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; public class AnalysisUtil { + + private static final List<FunctionIdentifier> fieldAccessFunctions = + Arrays.asList(BuiltinFunctions.GET_DATA, BuiltinFunctions.GET_HANDLE, BuiltinFunctions.TYPE_OF); + /* * If the first child of op is of type opType, then it returns that child, * o/w returns null. @@ -204,11 +242,86 @@ public class AnalysisUtil { return true; } - private static List<FunctionIdentifier> fieldAccessFunctions = new ArrayList<>(); + public static List<List<IAObject>> runQuery(Mutable<ILogicalOperator> topOp, List<LogicalVariable> resultVars, + IOptimizationContext queryOptCtx, IRuleSetKind ruleSetKind) throws AlgebricksException { + + MetadataProvider metadataProvider = (MetadataProvider) queryOptCtx.getMetadataProvider(); + ICcApplicationContext appCtx = metadataProvider.getApplicationContext(); + TxnId mainTxnId = metadataProvider.getTxnId(); + try { + TxnId newTxnId = metadataProvider.getTxnIdFactory().create(); + metadataProvider.setTxnId(newTxnId); + + IVariableTypeEnvironment topOpTypeEnv = queryOptCtx.getOutputTypeEnvironment(topOp.getValue()); + SerializerDeserializerProvider serdeProvider = SerializerDeserializerProvider.INSTANCE; + + int nFields = resultVars.size(); + List<Mutable<ILogicalExpression>> resultExprList = new ArrayList<>(nFields); + List<ISerializerDeserializer<?>> resultSerdeList = new ArrayList<>(nFields); + + for (LogicalVariable var : resultVars) { + Object varType = topOpTypeEnv.getVarType(var); + if (varType == null) { + throw new IllegalArgumentException("Cannot determine type of " + var); + } + resultSerdeList.add(serdeProvider.getSerializerDeserializer(varType)); + resultExprList.add(new MutableObject<>(new VariableReferenceExpression(var))); + } + + ResultMetadata resultMetadata = new ResultMetadata(SessionConfig.OutputFormat.ADM); + ResultSetId resultSetId = new ResultSetId(metadataProvider.getResultSetIdCounter().getAndInc()); + ResultSetSinkId rssId = new ResultSetSinkId(resultSetId); + ResultSetDataSink sink = new ResultSetDataSink(rssId, null); + + DistributeResultOperator resultOp = new DistributeResultOperator(resultExprList, sink, resultMetadata); + resultOp.getInputs().add(topOp); + queryOptCtx.computeAndSetTypeEnvironmentForOperator(resultOp); + + MutableObject<ILogicalOperator> newResultOpRef = new MutableObject<>(resultOp); - static { - fieldAccessFunctions.add(BuiltinFunctions.GET_DATA); - fieldAccessFunctions.add(BuiltinFunctions.GET_HANDLE); - fieldAccessFunctions.add(BuiltinFunctions.TYPE_OF); + ICompilerFactory compilerFactory = (ICompilerFactory) queryOptCtx.getCompilerFactory(); + ICompiler compiler = + compilerFactory.createCompiler(new ALogicalPlanImpl(newResultOpRef), queryOptCtx, ruleSetKind); + compiler.optimize(); + + JobSpecification jobSpec = compiler.createJob(appCtx, new JobEventListenerFactory(newTxnId, false)); + + JobId jobId = JobUtils.runJob(appCtx.getHcc(), jobSpec, true); + + IResultSetReader resultSetReader = appCtx.getResultSet().createReader(jobId, resultSetId); + FrameManager frameManager = new FrameManager(queryOptCtx.getPhysicalOptimizationConfig().getFrameSize()); + IFrame frame = new VSizeFrame(frameManager); + + FrameTupleAccessor fta = new FrameTupleAccessor(null); + ByteArrayAccessibleInputStream bais = new ByteArrayAccessibleInputStream(frame.getBuffer().array(), 0, 0); + DataInputStream dis = new DataInputStream(bais); + List<List<IAObject>> result = new ArrayList<>(); + + while (resultSetReader.read(frame) > 0) { + ByteBuffer buffer = frame.getBuffer(); + fta.reset(buffer); + int nTuples = fta.getTupleCount(); + for (int tupleIdx = 0; tupleIdx < nTuples; tupleIdx++) { + int tupleStart = fta.getTupleStartOffset(tupleIdx); + int tupleEnd = fta.getTupleEndOffset(tupleIdx); + bais.setContent(buffer.array(), tupleStart, tupleEnd - tupleStart); + + List<IAObject> values = new ArrayList<>(nFields); + for (int fieldIdx = 0; fieldIdx < nFields; fieldIdx++) { + IAObject value = (IAObject) resultSerdeList.get(fieldIdx).deserialize(dis); + values.add(value); + } + result.add(values); + } + } + + return result; + } catch (AlgebricksException e) { + throw e; + } catch (Exception e) { + throw new AlgebricksException(e); + } finally { + metadataProvider.setTxnId(mainTxnId); + } } } diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AsterixOptimizationContext.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AsterixOptimizationContext.java index 51c03eb5cd..7b2dd7a9b1 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AsterixOptimizationContext.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/base/AsterixOptimizationContext.java @@ -31,6 +31,7 @@ import org.apache.hyracks.algebricks.core.algebra.expressions.IMergeAggregationE import org.apache.hyracks.algebricks.core.algebra.expressions.IMissableTypeComputer; import org.apache.hyracks.algebricks.core.algebra.prettyprint.IPlanPrettyPrinter; import org.apache.hyracks.algebricks.core.rewriter.base.AlgebricksOptimizationContext; +import org.apache.hyracks.algebricks.core.rewriter.base.IOptimizationContextFactory; import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig; import org.apache.hyracks.api.exceptions.IWarningCollector; @@ -39,17 +40,24 @@ import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap; public final class AsterixOptimizationContext extends AlgebricksOptimizationContext { - private final Int2ObjectMap<Set<DataSource>> dataSourceMap = new Int2ObjectOpenHashMap<>(); + private final Int2ObjectOpenHashMap<Set<DataSource>> dataSourceMap; - public AsterixOptimizationContext(int varCounter, IExpressionEvalSizeComputer expressionEvalSizeComputer, + public AsterixOptimizationContext(IOptimizationContextFactory optContextFactory, int varCounter, + IExpressionEvalSizeComputer expressionEvalSizeComputer, IMergeAggregationExpressionFactory mergeAggregationExpressionFactory, IExpressionTypeComputer expressionTypeComputer, IMissableTypeComputer nullableTypeComputer, IConflictingTypeResolver conflictingTypeResovler, PhysicalOptimizationConfig physicalOptimizationConfig, AlgebricksPartitionConstraint clusterLocations, IPlanPrettyPrinter prettyPrinter, IWarningCollector warningCollector) { - super(varCounter, expressionEvalSizeComputer, mergeAggregationExpressionFactory, expressionTypeComputer, - nullableTypeComputer, conflictingTypeResovler, physicalOptimizationConfig, clusterLocations, - prettyPrinter, warningCollector); + super(optContextFactory, varCounter, expressionEvalSizeComputer, mergeAggregationExpressionFactory, + expressionTypeComputer, nullableTypeComputer, conflictingTypeResovler, physicalOptimizationConfig, + clusterLocations, prettyPrinter, warningCollector); + dataSourceMap = new Int2ObjectOpenHashMap<>(); + } + + public AsterixOptimizationContext(AsterixOptimizationContext from) { + super(from); + dataSourceMap = from.dataSourceMap.clone(); } public void addDataSource(DataSource dataSource) { diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ConstantFoldingRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ConstantFoldingRule.java index 10aedc766a..92e9ddabed 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ConstantFoldingRule.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/ConstantFoldingRule.java @@ -29,7 +29,6 @@ import org.apache.asterix.common.config.GlobalConfig; import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.exceptions.CompilationException; import org.apache.asterix.common.exceptions.ErrorCode; -import org.apache.asterix.common.exceptions.NoOpWarningCollector; import org.apache.asterix.common.exceptions.WarningCollector; import org.apache.asterix.dataflow.data.common.ExpressionTypeComputer; import org.apache.asterix.dataflow.data.nontagged.MissingWriterFactory; @@ -90,11 +89,14 @@ import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConf import org.apache.hyracks.algebricks.runtime.base.IEvaluatorContext; import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator; import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.hyracks.algebricks.runtime.serializer.ResultSerializerFactoryProvider; +import org.apache.hyracks.algebricks.runtime.writers.PrinterBasedWriterFactory; import org.apache.hyracks.api.application.IServiceContext; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.exceptions.IWarningCollector; +import org.apache.hyracks.api.exceptions.NoOpWarningCollector; import org.apache.hyracks.api.exceptions.Warning; import org.apache.hyracks.data.std.api.IPointable; import org.apache.hyracks.data.std.primitive.VoidPointable; @@ -151,8 +153,9 @@ public class ConstantFoldingRule implements IAlgebraicRewriteRule { jobGenCtx = new JobGenContext(null, metadataProvider, appCtx, SerializerDeserializerProvider.INSTANCE, BinaryHashFunctionFactoryProvider.INSTANCE, BinaryHashFunctionFamilyProvider.INSTANCE, BinaryComparatorFactoryProvider.INSTANCE, TypeTraitProvider.INSTANCE, BinaryBooleanInspector.FACTORY, - BinaryIntegerInspector.FACTORY, ADMPrinterFactoryProvider.INSTANCE, MissingWriterFactory.INSTANCE, - NullWriterFactory.INSTANCE, UnnestingPositionWriterFactory.INSTANCE, null, + BinaryIntegerInspector.FACTORY, ADMPrinterFactoryProvider.INSTANCE, PrinterBasedWriterFactory.INSTANCE, + ResultSerializerFactoryProvider.INSTANCE, MissingWriterFactory.INSTANCE, NullWriterFactory.INSTANCE, + UnnestingPositionWriterFactory.INSTANCE, null, new ExpressionRuntimeProvider(new QueryLogicalExpressionJobGen(metadataProvider.getFunctionManager())), ExpressionTypeComputer.INSTANCE, null, null, null, null, GlobalConfig.DEFAULT_FRAME_SIZE, null, NoOpWarningCollector.INSTANCE, 0, new PhysicalOptimizationConfig()); diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/ResultMetadata.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ResultMetadata.java similarity index 97% rename from asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/ResultMetadata.java rename to asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ResultMetadata.java index 94360a1204..78f84ffaa5 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/ResultMetadata.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/translator/ResultMetadata.java @@ -16,12 +16,11 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.asterix.api.common; +package org.apache.asterix.translator; import java.util.List; import java.util.Set; -import org.apache.asterix.translator.SessionConfig; import org.apache.hyracks.api.exceptions.Warning; import org.apache.hyracks.api.result.IResultMetadata; diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java index ce86d24c60..fcac5b45a1 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/APIFramework.java @@ -40,6 +40,7 @@ import org.apache.asterix.common.api.INodeJobTracker; import org.apache.asterix.common.api.IResponsePrinter; import org.apache.asterix.common.config.CompilerProperties; import org.apache.asterix.common.config.OptimizationConfUtil; +import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.exceptions.ACIDException; import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.common.exceptions.CompilationException; @@ -73,6 +74,7 @@ import org.apache.asterix.runtime.job.listener.JobEventListenerFactory; import org.apache.asterix.translator.CompiledStatements.ICompiledDmlStatement; import org.apache.asterix.translator.ExecutionPlans; import org.apache.asterix.translator.IRequestParameters; +import org.apache.asterix.translator.ResultMetadata; import org.apache.asterix.translator.SessionConfig; import org.apache.asterix.translator.SessionOutput; import org.apache.asterix.utils.ResourceUtils; @@ -97,6 +99,8 @@ import org.apache.hyracks.algebricks.core.algebra.prettyprint.PlanPrettyPrinter; import org.apache.hyracks.algebricks.core.rewriter.base.IOptimizationContextFactory; import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig; import org.apache.hyracks.algebricks.data.IPrinterFactoryProvider; +import org.apache.hyracks.algebricks.runtime.serializer.ResultSerializerFactoryProvider; +import org.apache.hyracks.algebricks.runtime.writers.PrinterBasedWriterFactory; import org.apache.hyracks.api.client.IClusterInfoCollector; import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.api.client.NodeControllerInfo; @@ -154,11 +158,16 @@ public class APIFramework { IConflictingTypeResolver conflictingTypeResolver, PhysicalOptimizationConfig physicalOptimizationConfig, AlgebricksPartitionConstraint clusterLocations, IWarningCollector warningCollector) { IPlanPrettyPrinter prettyPrinter = PlanPrettyPrinter.createStringPlanPrettyPrinter(); - return new AsterixOptimizationContext(varCounter, expressionEvalSizeComputer, + return new AsterixOptimizationContext(this, varCounter, expressionEvalSizeComputer, mergeAggregationExpressionFactory, expressionTypeComputer, missableTypeComputer, conflictingTypeResolver, physicalOptimizationConfig, clusterLocations, prettyPrinter, warningCollector); } + + @Override + public IOptimizationContext cloneOptimizationContext(IOptimizationContext oc) { + return new AsterixOptimizationContext((AsterixOptimizationContext) oc); + } } public Pair<IReturningStatement, Integer> reWriteQuery(List<FunctionDecl> declaredFunctions, @@ -210,7 +219,8 @@ public class APIFramework { && conf.is(SessionConfig.OOB_LOGICAL_PLAN)) { generateLogicalPlan(plan, output.config().getPlanFormat()); } - CompilerProperties compilerProperties = metadataProvider.getApplicationContext().getCompilerProperties(); + ICcApplicationContext ccAppContext = metadataProvider.getApplicationContext(); + CompilerProperties compilerProperties = ccAppContext.getCompilerProperties(); Map<String, Object> querySpecificConfig = validateConfig(metadataProvider.getConfig(), sourceLoc); final PhysicalOptimizationConfig physOptConf = OptimizationConfUtil.createPhysicalOptimizationConf(compilerProperties, querySpecificConfig, sourceLoc); @@ -218,8 +228,9 @@ public class APIFramework { HeuristicCompilerFactoryBuilder builder = new HeuristicCompilerFactoryBuilder(OptimizationContextFactory.INSTANCE); builder.setPhysicalOptimizationConfig(physOptConf); - builder.setLogicalRewrites(ruleSetFactory.getLogicalRewrites(metadataProvider.getApplicationContext())); - builder.setPhysicalRewrites(ruleSetFactory.getPhysicalRewrites(metadataProvider.getApplicationContext())); + builder.setLogicalRewrites(() -> ruleSetFactory.getLogicalRewrites(ccAppContext)); + builder.setLogicalRewritesByKind(kind -> ruleSetFactory.getLogicalRewrites(kind, ccAppContext)); + builder.setPhysicalRewrites(() -> ruleSetFactory.getPhysicalRewrites(ccAppContext)); IDataFormat format = metadataProvider.getDataFormat(); ICompilerFactory compilerFactory = builder.create(); builder.setExpressionEvalSizeComputer(format.getExpressionEvalSizeComputer()); @@ -237,6 +248,24 @@ public class APIFramework { chooseLocations(clusterInfoCollector, parallelism, metadataProvider.getClusterLocations()); builder.setClusterLocations(computationLocations); + builder.setBinaryBooleanInspectorFactory(format.getBinaryBooleanInspectorFactory()); + builder.setBinaryIntegerInspectorFactory(format.getBinaryIntegerInspectorFactory()); + builder.setComparatorFactoryProvider(format.getBinaryComparatorFactoryProvider()); + builder.setExpressionRuntimeProvider( + new ExpressionRuntimeProvider(new QueryLogicalExpressionJobGen(metadataProvider.getFunctionManager()))); + builder.setHashFunctionFactoryProvider(format.getBinaryHashFunctionFactoryProvider()); + builder.setHashFunctionFamilyProvider(format.getBinaryHashFunctionFamilyProvider()); + builder.setMissingWriterFactory(format.getMissingWriterFactory()); + builder.setNullWriterFactory(format.getNullWriterFactory()); + builder.setUnnestingPositionWriterFactory(format.getUnnestingPositionWriterFactory()); + builder.setPredicateEvaluatorFactoryProvider(format.getPredicateEvaluatorFactoryProvider()); + builder.setPrinterProvider(getPrinterFactoryProvider(format, conf.fmt())); + builder.setWriterFactory(PrinterBasedWriterFactory.INSTANCE); + builder.setResultSerializerFactoryProvider(ResultSerializerFactoryProvider.INSTANCE); + builder.setSerializerDeserializerProvider(format.getSerdeProvider()); + builder.setTypeTraitProvider(format.getTypeTraitProvider()); + builder.setNormalizedKeyComputerFactoryProvider(format.getNormalizedKeyComputerFactoryProvider()); + ICompiler compiler = compilerFactory.createCompiler(plan, metadataProvider, t.getVarCounter()); if (conf.isOptimize()) { compiler.optimize(); @@ -285,32 +314,16 @@ public class APIFramework { return null; } - builder.setBinaryBooleanInspectorFactory(format.getBinaryBooleanInspectorFactory()); - builder.setBinaryIntegerInspectorFactory(format.getBinaryIntegerInspectorFactory()); - builder.setComparatorFactoryProvider(format.getBinaryComparatorFactoryProvider()); - builder.setExpressionRuntimeProvider( - new ExpressionRuntimeProvider(new QueryLogicalExpressionJobGen(metadataProvider.getFunctionManager()))); - builder.setHashFunctionFactoryProvider(format.getBinaryHashFunctionFactoryProvider()); - builder.setHashFunctionFamilyProvider(format.getBinaryHashFunctionFamilyProvider()); - builder.setMissingWriterFactory(format.getMissingWriterFactory()); - builder.setNullWriterFactory(format.getNullWriterFactory()); - builder.setUnnestingPositionWriterFactory(format.getUnnestingPositionWriterFactory()); - builder.setPredicateEvaluatorFactoryProvider(format.getPredicateEvaluatorFactoryProvider()); - builder.setPrinterProvider(getPrinterFactoryProvider(format, conf.fmt())); - builder.setSerializerDeserializerProvider(format.getSerdeProvider()); - builder.setTypeTraitProvider(format.getTypeTraitProvider()); - builder.setNormalizedKeyComputerFactoryProvider(format.getNormalizedKeyComputerFactoryProvider()); - JobEventListenerFactory jobEventListenerFactory = new JobEventListenerFactory(txnId, metadataProvider.isWriteTransaction()); - JobSpecification spec = compiler.createJob(metadataProvider.getApplicationContext(), jobEventListenerFactory); + JobSpecification spec = compiler.createJob(ccAppContext, jobEventListenerFactory); if (isQuery) { if (requestParameters == null || !requestParameters.isSkipAdmissionPolicy()) { // Sets a required capacity, only for read-only queries. // DDLs and DMLs are considered not that frequent. // limit the computation locations to the locations that will be used in the query - final INodeJobTracker nodeJobTracker = metadataProvider.getApplicationContext().getNodeJobTracker(); + final INodeJobTracker nodeJobTracker = ccAppContext.getNodeJobTracker(); final AlgebricksAbsolutePartitionConstraint jobLocations = getJobLocations(spec, nodeJobTracker, computationLocations); final IClusterCapacity jobRequiredCapacity = diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java index 981cdc901a..bfebfd6c91 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/QueryResultApiServlet.java @@ -21,7 +21,6 @@ package org.apache.asterix.api.http.server; import java.io.IOException; import java.util.concurrent.ConcurrentMap; -import org.apache.asterix.api.common.ResultMetadata; import org.apache.asterix.app.result.ResponseMetrics; import org.apache.asterix.app.result.ResponsePrinter; import org.apache.asterix.app.result.ResultHandle; @@ -31,6 +30,7 @@ import org.apache.asterix.app.result.fields.ProfilePrinter; import org.apache.asterix.app.result.fields.ResultsPrinter; import org.apache.asterix.common.api.IApplicationContext; import org.apache.asterix.translator.IStatementExecutor.Stats; +import org.apache.asterix.translator.ResultMetadata; import org.apache.asterix.translator.SessionConfig; import org.apache.asterix.translator.SessionOutput; import org.apache.hyracks.api.exceptions.ErrorCode; diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/JobResultCallback.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/JobResultCallback.java index 05073f9e04..201a4700a6 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/JobResultCallback.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/JobResultCallback.java @@ -23,8 +23,8 @@ import java.util.HashSet; import java.util.Iterator; import java.util.Set; -import org.apache.asterix.api.common.ResultMetadata; import org.apache.asterix.common.dataflow.ICcApplicationContext; +import org.apache.asterix.translator.ResultMetadata; import org.apache.hyracks.api.exceptions.Warning; import org.apache.hyracks.api.job.JobFlag; import org.apache.hyracks.api.job.JobId; diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/fields/SignaturePrinter.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/fields/SignaturePrinter.java index 048584c713..3f1e632dff 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/fields/SignaturePrinter.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/fields/SignaturePrinter.java @@ -22,7 +22,6 @@ import java.io.PrintWriter; import java.util.LinkedHashSet; import java.util.List; -import org.apache.asterix.api.common.ResultMetadata; import org.apache.asterix.api.http.server.ResultUtil; import org.apache.asterix.common.annotations.IRecordTypeAnnotation; import org.apache.asterix.common.annotations.RecordFieldOrderAnnotation; @@ -34,6 +33,7 @@ import org.apache.asterix.om.types.AUnionType; import org.apache.asterix.om.types.BuiltinType; import org.apache.asterix.om.types.IAType; import org.apache.asterix.translator.ExecutionPlans; +import org.apache.asterix.translator.ResultMetadata; import org.apache.hyracks.algebricks.common.utils.Pair; import org.apache.hyracks.util.JSONUtil; diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java index f901f525b2..8eb13e15db 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java @@ -235,13 +235,11 @@ import org.apache.commons.lang3.mutable.MutableObject; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.common.utils.Pair; import org.apache.hyracks.algebricks.common.utils.Triple; +import org.apache.hyracks.algebricks.core.algebra.base.Counter; import org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression.FunctionKind; import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; import org.apache.hyracks.algebricks.core.algebra.util.OperatorPropertiesUtil; import org.apache.hyracks.algebricks.data.IAWriterFactory; -import org.apache.hyracks.algebricks.data.IResultSerializerFactoryProvider; -import org.apache.hyracks.algebricks.runtime.serializer.ResultSerializerFactoryProvider; -import org.apache.hyracks.algebricks.runtime.writers.PrinterBasedWriterFactory; import org.apache.hyracks.api.client.IClusterInfoCollector; import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -330,10 +328,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen public void compileAndExecute(IHyracksClientConnection hcc, IRequestParameters requestParameters) throws Exception { validateStatements(requestParameters); trackRequest(requestParameters); - int resultSetIdCounter = 0; + Counter resultSetIdCounter = new Counter(0); FileSplit outputFile = null; - IAWriterFactory writerFactory = PrinterBasedWriterFactory.INSTANCE; - IResultSerializerFactoryProvider resultSerializerFactoryProvider = ResultSerializerFactoryProvider.INSTANCE; String threadName = Thread.currentThread().getName(); Thread.currentThread().setName( QueryTranslator.class.getSimpleName() + ":" + requestParameters.getRequestReference().getUuid()); @@ -353,8 +349,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen } validateOperation(appCtx, activeDataverse, stmt); MetadataProvider metadataProvider = MetadataProvider.create(appCtx, activeDataverse); - configureMetadataProvider(metadataProvider, config, resultSerializerFactoryProvider, writerFactory, - outputFile, requestParameters, stmt); + configureMetadataProvider(metadataProvider, config, resultSetIdCounter, outputFile, requestParameters, + stmt); IStatementRewriter stmtRewriter = rewriterFactory.createStatementRewriter(); rewriteStatement(stmt, stmtRewriter, metadataProvider); // Rewrite the statement's AST. Statement.Kind kind = stmt.getKind(); @@ -444,7 +440,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen case INSERT: case UPSERT: if (((InsertStatement) stmt).getReturnExpression() != null) { - metadataProvider.setResultSetId(new ResultSetId(resultSetIdCounter++)); + metadataProvider.setResultSetId(new ResultSetId(resultSetIdCounter.getAndInc())); metadataProvider.setResultAsyncMode(resultDelivery == ResultDelivery.ASYNC || resultDelivery == ResultDelivery.DEFERRED); metadataProvider.setMaxResultReads(maxResultReads); @@ -480,7 +476,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen handleCreateFeedPolicyStatement(metadataProvider, stmt); break; case QUERY: - metadataProvider.setResultSetId(new ResultSetId(resultSetIdCounter++)); + metadataProvider.setResultSetId(new ResultSetId(resultSetIdCounter.getAndInc())); metadataProvider.setResultAsyncMode( resultDelivery == ResultDelivery.ASYNC || resultDelivery == ResultDelivery.DEFERRED); metadataProvider.setMaxResultReads(maxResultReads); @@ -506,7 +502,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen final ExtensionStatement extStmt = (ExtensionStatement) stmt; statementProperties.setName(extStmt.getName()); if (!isCompileOnly()) { - extStmt.handle(hcc, this, requestParameters, metadataProvider, resultSetIdCounter); + extStmt.handle(hcc, this, requestParameters, metadataProvider, + resultSetIdCounter.getAndInc()); } break; default: @@ -524,14 +521,13 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen } protected void configureMetadataProvider(MetadataProvider metadataProvider, Map<String, String> config, - IResultSerializerFactoryProvider resultSerializerFactoryProvider, IAWriterFactory writerFactory, - FileSplit outputFile, IRequestParameters requestParameters, Statement statement) { + Counter resultSetIdCounter, FileSplit outputFile, IRequestParameters requestParameters, + Statement statement) { if (statement.getKind() == Statement.Kind.QUERY && requestParameters.isSQLCompatMode()) { metadataProvider.getConfig().put(SqlppQueryRewriter.SQL_COMPAT_OPTION, Boolean.TRUE.toString()); } metadataProvider.getConfig().putAll(config); - metadataProvider.setWriterFactory(writerFactory); - metadataProvider.setResultSerializerFactoryProvider(resultSerializerFactoryProvider); + metadataProvider.setResultSetIdCounter(resultSetIdCounter); metadataProvider.setOutputFile(outputFile); } @@ -4375,8 +4371,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen private void updateJobStats(JobId jobId, Stats stats, ResultSetId rsId) throws HyracksDataException { final ClusterControllerService controllerService = (ClusterControllerService) appCtx.getServiceContext().getControllerService(); - org.apache.asterix.api.common.ResultMetadata resultMetadata = - (org.apache.asterix.api.common.ResultMetadata) controllerService.getResultDirectoryService() + org.apache.asterix.translator.ResultMetadata resultMetadata = + (org.apache.asterix.translator.ResultMetadata) controllerService.getResultDirectoryService() .getResultMetadata(jobId, rsId); stats.setProcessedObjects(resultMetadata.getProcessedObjects()); if (jobFlags.contains(JobFlag.PROFILE_RUNTIME)) { diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/NoOpWarningCollector.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/NoOpWarningCollector.java index a036b7ef93..6ac805bf8c 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/NoOpWarningCollector.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/NoOpWarningCollector.java @@ -20,27 +20,10 @@ package org.apache.asterix.common.exceptions; import org.apache.hyracks.api.exceptions.IWarningCollector; -import org.apache.hyracks.api.exceptions.Warning; -public final class NoOpWarningCollector implements IWarningCollector { - - public static final IWarningCollector INSTANCE = new NoOpWarningCollector(); +public final class NoOpWarningCollector { + public static final IWarningCollector INSTANCE = org.apache.hyracks.api.exceptions.NoOpWarningCollector.INSTANCE; private NoOpWarningCollector() { } - - @Override - public void warn(Warning warning) { - // no-op - } - - @Override - public boolean shouldWarn() { - return false; - } - - @Override - public long getTotalWarningsCount() { - return 0; - } } diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java index 4cc83abe97..ca8dd10601 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java @@ -115,6 +115,7 @@ import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConst import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.common.utils.Pair; import org.apache.hyracks.algebricks.common.utils.Triple; +import org.apache.hyracks.algebricks.core.algebra.base.Counter; import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression; import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable; import org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionRuntimeProvider; @@ -182,12 +183,11 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> private Dataverse defaultDataverse; private MetadataTransactionContext mdTxnCtx; private boolean isWriteTransaction; - private IAWriterFactory writerFactory; private FileSplit outputFile; private boolean asyncResults; private long maxResultReads; private ResultSetId resultSetId; - private IResultSerializerFactoryProvider resultSerializerFactoryProvider; + private Counter resultSetIdCounter; private TxnId txnId; private Map<String, Integer> externalDataLocks; private boolean blockingOperatorDisabled = false; @@ -262,10 +262,6 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> this.isWriteTransaction = writeTransaction; } - public void setWriterFactory(IAWriterFactory writerFactory) { - this.writerFactory = writerFactory; - } - public void setMetadataTxnContext(MetadataTransactionContext mdTxnCtx) { this.mdTxnCtx = mdTxnCtx; } @@ -274,10 +270,6 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> return mdTxnCtx; } - public IAWriterFactory getWriterFactory() { - return this.writerFactory; - } - public FileSplit getOutputFile() { return outputFile; } @@ -310,12 +302,12 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> this.resultSetId = resultSetId; } - public void setResultSerializerFactoryProvider(IResultSerializerFactoryProvider rafp) { - this.resultSerializerFactoryProvider = rafp; + public Counter getResultSetIdCounter() { + return resultSetIdCounter; } - public IResultSerializerFactoryProvider getResultSerializerFactoryProvider() { - return resultSerializerFactoryProvider; + public void setResultSetIdCounter(Counter resultSetIdCounter) { + this.resultSetIdCounter = resultSetIdCounter; } public boolean isWriteTransaction() { @@ -683,7 +675,8 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> @Override public Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> getWriteFileRuntime(IDataSink sink, - int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc) { + int[] printColumns, IPrinterFactory[] printerFactories, IAWriterFactory writerFactory, + RecordDescriptor inputDesc) { FileSplitDataSink fsds = (FileSplitDataSink) sink; FileSplitSinkId fssi = fsds.getId(); FileSplit fs = fssi.getFileSplit(); @@ -691,14 +684,15 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> String nodeId = fs.getNodeName(); SinkWriterRuntimeFactory runtime = - new SinkWriterRuntimeFactory(printColumns, printerFactories, outFile, getWriterFactory(), inputDesc); + new SinkWriterRuntimeFactory(printColumns, printerFactories, outFile, writerFactory, inputDesc); AlgebricksPartitionConstraint apc = new AlgebricksAbsolutePartitionConstraint(new String[] { nodeId }); return new Pair<>(runtime, apc); } @Override public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getResultHandleRuntime(IDataSink sink, - int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc, + int[] printColumns, IPrinterFactory[] printerFactories, IAWriterFactory writerFactory, + IResultSerializerFactoryProvider resultSerializerFactoryProvider, RecordDescriptor inputDesc, IResultMetadata metadata, JobSpecification spec) throws AlgebricksException { ResultSetDataSink rsds = (ResultSetDataSink) sink; ResultSetSinkId rssId = rsds.getId(); @@ -706,7 +700,7 @@ public class MetadataProvider implements IMetadataProvider<DataSourceId, String> ResultWriterOperatorDescriptor resultWriter = null; try { IResultSerializerFactory resultSerializedAppenderFactory = resultSerializerFactoryProvider - .getAqlResultSerializerFactoryProvider(printColumns, printerFactories, getWriterFactory()); + .getResultSerializerFactoryProvider(printColumns, printerFactories, writerFactory); resultWriter = new ResultWriterOperatorDescriptor(spec, rsId, metadata, getResultAsyncMode(), resultSerializedAppenderFactory, getMaxResultReads()); } catch (IOException e) { diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java index 92390a766a..0ee9516f3c 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/feeds/FeedMetadataUtil.java @@ -30,7 +30,6 @@ import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.common.exceptions.CompilationException; import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.common.exceptions.MetadataException; -import org.apache.asterix.common.exceptions.NoOpWarningCollector; import org.apache.asterix.common.external.IDataSourceAdapter; import org.apache.asterix.common.external.IDataSourceAdapter.AdapterType; import org.apache.asterix.common.functions.ExternalFunctionLanguage; @@ -62,6 +61,7 @@ import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.exceptions.IWarningCollector; +import org.apache.hyracks.api.exceptions.NoOpWarningCollector; /** * A utility class for providing helper functions for feeds TODO: Refactor this diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalIndexingOperations.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalIndexingOperations.java index b899e16e8b..c6eafe0cf4 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalIndexingOperations.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/ExternalIndexingOperations.java @@ -30,7 +30,6 @@ import org.apache.asterix.common.config.DatasetConfig.DatasetType; import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp; import org.apache.asterix.common.config.DatasetConfig.TransactionState; import org.apache.asterix.common.context.IStorageComponentProvider; -import org.apache.asterix.common.exceptions.NoOpWarningCollector; import org.apache.asterix.external.api.ITypedAdapterFactory; import org.apache.asterix.external.indexing.ExternalFile; import org.apache.asterix.external.indexing.IndexingConstants; @@ -64,6 +63,7 @@ import org.apache.hyracks.algebricks.core.jobgen.impl.ConnectorPolicyAssignmentP import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.exceptions.NoOpWarningCollector; import org.apache.hyracks.api.exceptions.SourceLocation; import org.apache.hyracks.api.job.JobSpecification; import org.apache.hyracks.dataflow.std.file.IFileSplitProvider; diff --git a/hyracks-fullstack/algebricks/algebricks-compiler/pom.xml b/hyracks-fullstack/algebricks/algebricks-compiler/pom.xml index fbe9dbc67d..f557d7af41 100644 --- a/hyracks-fullstack/algebricks/algebricks-compiler/pom.xml +++ b/hyracks-fullstack/algebricks/algebricks-compiler/pom.xml @@ -57,6 +57,11 @@ <artifactId>algebricks-data</artifactId> <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.hyracks</groupId> + <artifactId>algebricks-runtime</artifactId> + <version>${project.version}</version> + </dependency> <dependency> <groupId>org.apache.hyracks</groupId> <artifactId>hyracks-api</artifactId> diff --git a/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/AbstractCompilerFactoryBuilder.java b/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/AbstractCompilerFactoryBuilder.java index c22d54d944..3a2fe3bbbd 100644 --- a/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/AbstractCompilerFactoryBuilder.java +++ b/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/AbstractCompilerFactoryBuilder.java @@ -19,6 +19,8 @@ package org.apache.hyracks.algebricks.compiler.api; import java.util.List; +import java.util.function.Function; +import java.util.function.Supplier; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; @@ -32,7 +34,9 @@ import org.apache.hyracks.algebricks.core.algebra.expressions.IMissableTypeCompu import org.apache.hyracks.algebricks.core.algebra.expressions.IPartialAggregationTypeComputer; import org.apache.hyracks.algebricks.core.rewriter.base.AbstractRuleController; import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule; +import org.apache.hyracks.algebricks.core.rewriter.base.IRuleSetKind; import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig; +import org.apache.hyracks.algebricks.data.IAWriterFactory; import org.apache.hyracks.algebricks.data.IBinaryBooleanInspectorFactory; import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider; import org.apache.hyracks.algebricks.data.IBinaryHashFunctionFactoryProvider; @@ -40,6 +44,7 @@ import org.apache.hyracks.algebricks.data.IBinaryHashFunctionFamilyProvider; import org.apache.hyracks.algebricks.data.IBinaryIntegerInspectorFactory; import org.apache.hyracks.algebricks.data.INormalizedKeyComputerFactoryProvider; import org.apache.hyracks.algebricks.data.IPrinterFactoryProvider; +import org.apache.hyracks.algebricks.data.IResultSerializerFactoryProvider; import org.apache.hyracks.algebricks.data.ISerializerDeserializerProvider; import org.apache.hyracks.algebricks.data.ITypeTraitProvider; import org.apache.hyracks.algebricks.data.IUnnestingPositionWriterFactory; @@ -49,8 +54,9 @@ import org.apache.hyracks.api.exceptions.IWarningCollector; public abstract class AbstractCompilerFactoryBuilder { - protected List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> logicalRewrites; - protected List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> physicalRewrites; + protected Supplier<List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>>> logicalRewrites; + protected Function<IRuleSetKind, List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>>> logicalRewritesByKind; + protected Supplier<List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>>> physicalRewrites; protected ITypeTraitProvider typeTraitProvider; protected ISerializerDeserializerProvider serializerDeserializerProvider; protected IBinaryHashFunctionFactoryProvider hashFunctionFactoryProvider; @@ -59,6 +65,8 @@ public abstract class AbstractCompilerFactoryBuilder { protected IBinaryBooleanInspectorFactory binaryBooleanInspectorFactory; protected IBinaryIntegerInspectorFactory binaryIntegerInspectorFactory; protected IPrinterFactoryProvider printerProvider; + protected IAWriterFactory writerFactory; + protected IResultSerializerFactoryProvider resultSerializerFactoryProvider; protected IPredicateEvaluatorFactoryProvider predEvaluatorFactoryProvider; protected IExpressionRuntimeProvider expressionRuntimeProvider; protected IExpressionTypeComputer expressionTypeComputer; @@ -78,11 +86,18 @@ public abstract class AbstractCompilerFactoryBuilder { public abstract ICompilerFactory create(); - public void setLogicalRewrites(List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> logicalRewrites) { + public void setLogicalRewrites( + Supplier<List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>>> logicalRewrites) { this.logicalRewrites = logicalRewrites; } - public void setPhysicalRewrites(List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> physicalRewrites) { + public void setLogicalRewritesByKind( + Function<IRuleSetKind, List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>>> logicalRewritesByKind) { + this.logicalRewritesByKind = logicalRewritesByKind; + } + + public void setPhysicalRewrites( + Supplier<List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>>> physicalRewrites) { this.physicalRewrites = physicalRewrites; } @@ -158,6 +173,22 @@ public abstract class AbstractCompilerFactoryBuilder { return printerProvider; } + public void setWriterFactory(IAWriterFactory writerFactory) { + this.writerFactory = writerFactory; + } + + public IAWriterFactory getWriterFactory() { + return writerFactory; + } + + public void setResultSerializerFactoryProvider(IResultSerializerFactoryProvider resultSerializerFactoryProvider) { + this.resultSerializerFactoryProvider = resultSerializerFactoryProvider; + } + + public IResultSerializerFactoryProvider getResultSerializerFactoryProvider() { + return resultSerializerFactoryProvider; + } + public void setExpressionRuntimeProvider(IExpressionRuntimeProvider expressionRuntimeProvider) { this.expressionRuntimeProvider = expressionRuntimeProvider; } diff --git a/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java b/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java index 891980fde2..e35a539952 100644 --- a/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java +++ b/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/HeuristicCompilerFactoryBuilder.java @@ -18,8 +18,11 @@ */ package org.apache.hyracks.algebricks.compiler.api; +import java.util.List; + import org.apache.hyracks.algebricks.common.constraints.AlgebricksPartitionConstraint; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.common.utils.Pair; import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan; import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext; import org.apache.hyracks.algebricks.core.algebra.expressions.IConflictingTypeResolver; @@ -33,10 +36,15 @@ import org.apache.hyracks.algebricks.core.algebra.prettyprint.PlanPrettyPrinter; import org.apache.hyracks.algebricks.core.config.AlgebricksConfig; import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext; import org.apache.hyracks.algebricks.core.jobgen.impl.PlanCompiler; +import org.apache.hyracks.algebricks.core.rewriter.base.AbstractRuleController; import org.apache.hyracks.algebricks.core.rewriter.base.AlgebricksOptimizationContext; import org.apache.hyracks.algebricks.core.rewriter.base.HeuristicOptimizer; +import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule; import org.apache.hyracks.algebricks.core.rewriter.base.IOptimizationContextFactory; +import org.apache.hyracks.algebricks.core.rewriter.base.IRuleSetKind; import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig; +import org.apache.hyracks.algebricks.data.IAWriterFactory; +import org.apache.hyracks.algebricks.runtime.writers.SerializedDataWriterFactory; import org.apache.hyracks.api.exceptions.IWarningCollector; import org.apache.hyracks.api.job.IJobletEventListenerFactory; import org.apache.hyracks.api.job.JobSpecification; @@ -58,14 +66,19 @@ public class HeuristicCompilerFactoryBuilder extends AbstractCompilerFactoryBuil IConflictingTypeResolver conflictingTypeResolver, PhysicalOptimizationConfig physicalOptimizationConfig, AlgebricksPartitionConstraint clusterLocations, IWarningCollector warningCollector) { IPlanPrettyPrinter prettyPrinter = PlanPrettyPrinter.createStringPlanPrettyPrinter(); - return new AlgebricksOptimizationContext(varCounter, expressionEvalSizeComputer, + return new AlgebricksOptimizationContext(this, varCounter, expressionEvalSizeComputer, mergeAggregationExpressionFactory, expressionTypeComputer, missableTypeComputer, conflictingTypeResolver, physicalOptimizationConfig, clusterLocations, prettyPrinter, warningCollector); } + + @Override + public IOptimizationContext cloneOptimizationContext(IOptimizationContext oc) { + return new AlgebricksOptimizationContext((AlgebricksOptimizationContext) oc); + } } - private IOptimizationContextFactory optCtxFactory; + private final IOptimizationContextFactory optCtxFactory; public HeuristicCompilerFactoryBuilder() { this.optCtxFactory = DefaultOptimizationContextFactory.INSTANCE; @@ -77,42 +90,85 @@ public class HeuristicCompilerFactoryBuilder extends AbstractCompilerFactoryBuil @Override public ICompilerFactory create() { - return new ICompilerFactory() { - @Override - public ICompiler createCompiler(final ILogicalPlan plan, final IMetadataProvider<?, ?> metadata, - int varCounter) { - final IOptimizationContext oc = optCtxFactory.createOptimizationContext(varCounter, - expressionEvalSizeComputer, mergeAggregationExpressionFactory, expressionTypeComputer, - missableTypeComputer, conflictingTypeResolver, physicalOptimizationConfig, clusterLocations, - warningCollector); - oc.setMetadataDeclarations(metadata); - final HeuristicOptimizer opt = new HeuristicOptimizer(plan, logicalRewrites, physicalRewrites, oc); - return new ICompiler() { - - @Override - public void optimize() throws AlgebricksException { - opt.optimize(); - } - - @Override - public JobSpecification createJob(Object appContext, - IJobletEventListenerFactory jobEventListenerFactory) throws AlgebricksException { - AlgebricksConfig.ALGEBRICKS_LOGGER.trace("Starting Job Generation.\n"); - JobGenContext context = new JobGenContext(null, metadata, appContext, - serializerDeserializerProvider, hashFunctionFactoryProvider, hashFunctionFamilyProvider, - comparatorFactoryProvider, typeTraitProvider, binaryBooleanInspectorFactory, - binaryIntegerInspectorFactory, printerProvider, missingWriterFactory, nullWriterFactory, - unnestingPositionWriterFactory, normalizedKeyComputerFactoryProvider, - expressionRuntimeProvider, expressionTypeComputer, oc, expressionEvalSizeComputer, - partialAggregationTypeComputer, predEvaluatorFactoryProvider, - physicalOptimizationConfig.getFrameSize(), clusterLocations, warningCollector, - maxWarnings, physicalOptimizationConfig); - PlanCompiler pc = new PlanCompiler(context); - return pc.compilePlan(plan, jobEventListenerFactory); - } - }; + return new CompilerFactoryImpl(); + } + + private class CompilerFactoryImpl implements ICompilerFactory { + @Override + public ICompiler createCompiler(ILogicalPlan plan, IMetadataProvider<?, ?> metadata, int varCounter) { + IOptimizationContext optContext = + optCtxFactory.createOptimizationContext(varCounter, expressionEvalSizeComputer, + mergeAggregationExpressionFactory, expressionTypeComputer, missableTypeComputer, + conflictingTypeResolver, physicalOptimizationConfig, clusterLocations, warningCollector); + optContext.setMetadataDeclarations(metadata); + optContext.setCompilerFactory(this); + return new CompilerImpl(this, plan, optContext, logicalRewrites.get(), physicalRewrites.get(), + writerFactory); + } + + @Override + public ICompiler createCompiler(ILogicalPlan plan, IOptimizationContext newOptContext, + IRuleSetKind ruleSetKind) { + if (newOptContext.getCompilerFactory() != this) { + throw new IllegalStateException(); } - }; + return new CompilerImpl(this, plan, newOptContext, logicalRewritesByKind.apply(ruleSetKind), + physicalRewrites.get(), SerializedDataWriterFactory.WITHOUT_RECORD_DESCRIPTOR); + } + + private PlanCompiler createPlanCompiler(IOptimizationContext oc, Object appContext, + IAWriterFactory writerFactory) { + JobGenContext context = new JobGenContext(null, oc.getMetadataProvider(), appContext, + serializerDeserializerProvider, hashFunctionFactoryProvider, hashFunctionFamilyProvider, + comparatorFactoryProvider, typeTraitProvider, binaryBooleanInspectorFactory, + binaryIntegerInspectorFactory, printerProvider, writerFactory, resultSerializerFactoryProvider, + missingWriterFactory, nullWriterFactory, unnestingPositionWriterFactory, + normalizedKeyComputerFactoryProvider, expressionRuntimeProvider, expressionTypeComputer, oc, + expressionEvalSizeComputer, partialAggregationTypeComputer, predEvaluatorFactoryProvider, + physicalOptimizationConfig.getFrameSize(), clusterLocations, warningCollector, maxWarnings, + physicalOptimizationConfig); + return new PlanCompiler(context); + } } + private static class CompilerImpl implements ICompiler { + + private final CompilerFactoryImpl factory; + + private final ILogicalPlan plan; + + private final IOptimizationContext oc; + + private final List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> logicalRewrites; + + private final List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> physicalRewrites; + + private final IAWriterFactory writerFactory; + + private CompilerImpl(CompilerFactoryImpl factory, ILogicalPlan plan, IOptimizationContext oc, + List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> logicalRewrites, + List<Pair<AbstractRuleController, List<IAlgebraicRewriteRule>>> physicalRewrites, + IAWriterFactory writerFactory) { + this.factory = factory; + this.plan = plan; + this.oc = oc; + this.logicalRewrites = logicalRewrites; + this.physicalRewrites = physicalRewrites; + this.writerFactory = writerFactory; + } + + @Override + public void optimize() throws AlgebricksException { + HeuristicOptimizer opt = new HeuristicOptimizer(plan, logicalRewrites, physicalRewrites, oc); + opt.optimize(); + } + + @Override + public JobSpecification createJob(Object appContext, IJobletEventListenerFactory jobEventListenerFactory) + throws AlgebricksException { + AlgebricksConfig.ALGEBRICKS_LOGGER.trace("Starting Job Generation.\n"); + PlanCompiler pc = factory.createPlanCompiler(oc, appContext, writerFactory); + return pc.compilePlan(plan, jobEventListenerFactory); + } + } } diff --git a/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/ICompilerFactory.java b/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/ICompilerFactory.java index 7c138eaac8..07a80349a7 100644 --- a/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/ICompilerFactory.java +++ b/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/ICompilerFactory.java @@ -19,8 +19,12 @@ package org.apache.hyracks.algebricks.compiler.api; import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan; +import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext; import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider; +import org.apache.hyracks.algebricks.core.rewriter.base.IRuleSetKind; public interface ICompilerFactory { ICompiler createCompiler(ILogicalPlan plan, IMetadataProvider<?, ?> metadata, int varCounter); + + ICompiler createCompiler(ILogicalPlan plan, IOptimizationContext newOptContext, IRuleSetKind ruleSetKind); } diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/Counter.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/Counter.java index 7f2d3c8dcd..398217137d 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/Counter.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/Counter.java @@ -32,10 +32,18 @@ public class Counter { return counter; } + public int getAndInc() { + return counter++; + } + public void inc() { ++counter; } + public int incAndGet() { + return ++counter; + } + public void set(int newStart) { counter = newStart; } diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IOptimizationContext.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IOptimizationContext.java index 166ab9a124..69ec210362 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IOptimizationContext.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/base/IOptimizationContext.java @@ -33,6 +33,7 @@ import org.apache.hyracks.algebricks.core.algebra.properties.ILogicalPropertiesV import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain; import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext; import org.apache.hyracks.algebricks.core.rewriter.base.IAlgebraicRewriteRule; +import org.apache.hyracks.algebricks.core.rewriter.base.IOptimizationContextFactory; import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig; import org.apache.hyracks.api.exceptions.IWarningCollector; @@ -93,4 +94,10 @@ public interface IOptimizationContext extends ITypingContext, IVariableContext { public PlanStructureVerifier getPlanStructureVerifier(); public PlanStabilityVerifier getPlanStabilityVerifier(); + + void setCompilerFactory(Object factory); + + Object getCompilerFactory(); + + IOptimizationContextFactory getOptimizationContextFactory(); } diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java index 77fdd74562..d350789dd7 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/metadata/IMetadataProvider.java @@ -31,7 +31,9 @@ import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier; import org.apache.hyracks.algebricks.core.algebra.functions.IFunctionInfo; import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema; import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext; +import org.apache.hyracks.algebricks.data.IAWriterFactory; import org.apache.hyracks.algebricks.data.IPrinterFactory; +import org.apache.hyracks.algebricks.data.IResultSerializerFactoryProvider; import org.apache.hyracks.algebricks.runtime.base.AlgebricksPipeline; import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory; import org.apache.hyracks.api.dataflow.IOperatorDescriptor; @@ -56,11 +58,12 @@ public interface IMetadataProvider<S, I> { IProjectionInfo<?> projectionInfo) throws AlgebricksException; public Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> getWriteFileRuntime(IDataSink sink, - int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc) - throws AlgebricksException; + int[] printColumns, IPrinterFactory[] printerFactories, IAWriterFactory writerFactory, + RecordDescriptor inputDesc) throws AlgebricksException; public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getResultHandleRuntime(IDataSink sink, - int[] printColumns, IPrinterFactory[] printerFactories, RecordDescriptor inputDesc, + int[] printColumns, IPrinterFactory[] printerFactories, IAWriterFactory writerFactory, + IResultSerializerFactoryProvider resultSerializerFactoryProvider, RecordDescriptor inputDesc, IResultMetadata metadata, JobSpecification spec) throws AlgebricksException; public Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> getWriteResultRuntime(IDataSource<S> dataSource, diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java index 138cff8960..1f7c16c591 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/DistributeResultPOperator.java @@ -102,8 +102,9 @@ public class DistributeResultPOperator extends AbstractPhysicalOperator { IPrinterFactory[] pf = JobGenHelper.mkPrinterFactories(inputSchemas[0], context.getTypeEnvironment(op), context, columns); - Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> runtimeAndConstraints = mp.getResultHandleRuntime( - resultOp.getDataSink(), columns, pf, inputDesc, resultOp.getResultMetadata(), spec); + Pair<IOperatorDescriptor, AlgebricksPartitionConstraint> runtimeAndConstraints = + mp.getResultHandleRuntime(resultOp.getDataSink(), columns, pf, context.getWriterFactory(), + context.getResultSerializerFactoryProvider(), inputDesc, resultOp.getResultMetadata(), spec); IOperatorDescriptor opDesc = runtimeAndConstraints.first; opDesc.setSourceLocation(resultOp.getSourceLocation()); builder.contributeHyracksOperator(resultOp, opDesc); diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java index 3521a27bff..07c798f2dc 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/operators/physical/SinkWritePOperator.java @@ -101,7 +101,7 @@ public class SinkWritePOperator extends AbstractPhysicalOperator { IMetadataProvider<?, ?> mp = context.getMetadataProvider(); Pair<IPushRuntimeFactory, AlgebricksPartitionConstraint> runtimeAndConstraints = - mp.getWriteFileRuntime(write.getDataSink(), columns, pf, inputDesc); + mp.getWriteFileRuntime(write.getDataSink(), columns, pf, context.getWriterFactory(), inputDesc); IPushRuntimeFactory runtime = runtimeAndConstraints.first; runtime.setSourceLocation(write.getSourceLocation()); diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobGenContext.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobGenContext.java index 7c7d5a8670..471380cb75 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobGenContext.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/jobgen/impl/JobGenContext.java @@ -35,6 +35,7 @@ import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider; import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema; import org.apache.hyracks.algebricks.core.algebra.typing.ITypingContext; import org.apache.hyracks.algebricks.core.rewriter.base.PhysicalOptimizationConfig; +import org.apache.hyracks.algebricks.data.IAWriterFactory; import org.apache.hyracks.algebricks.data.IBinaryBooleanInspectorFactory; import org.apache.hyracks.algebricks.data.IBinaryComparatorFactoryProvider; import org.apache.hyracks.algebricks.data.IBinaryHashFunctionFactoryProvider; @@ -42,6 +43,7 @@ import org.apache.hyracks.algebricks.data.IBinaryHashFunctionFamilyProvider; import org.apache.hyracks.algebricks.data.IBinaryIntegerInspectorFactory; import org.apache.hyracks.algebricks.data.INormalizedKeyComputerFactoryProvider; import org.apache.hyracks.algebricks.data.IPrinterFactoryProvider; +import org.apache.hyracks.algebricks.data.IResultSerializerFactoryProvider; import org.apache.hyracks.algebricks.data.ISerializerDeserializerProvider; import org.apache.hyracks.algebricks.data.ITypeTraitProvider; import org.apache.hyracks.algebricks.data.IUnnestingPositionWriterFactory; @@ -57,6 +59,8 @@ public class JobGenContext { private final IBinaryHashFunctionFamilyProvider hashFunctionFamilyProvider; private final IBinaryComparatorFactoryProvider comparatorFactoryProvider; private final IPrinterFactoryProvider printerFactoryProvider; + private final IAWriterFactory writerFactory; + private final IResultSerializerFactoryProvider resultSerializerFactoryProvider; private final ITypeTraitProvider typeTraitProvider; private final IMetadataProvider<?, ?> metadataProvider; private final IMissingWriterFactory missingWriterFactory; @@ -86,6 +90,7 @@ public class JobGenContext { IBinaryComparatorFactoryProvider comparatorFactoryProvider, ITypeTraitProvider typeTraitProvider, IBinaryBooleanInspectorFactory booleanInspectorFactory, IBinaryIntegerInspectorFactory integerInspectorFactory, IPrinterFactoryProvider printerFactoryProvider, + IAWriterFactory writerFactory, IResultSerializerFactoryProvider resultSerializerFactoryProvider, IMissingWriterFactory missingWriterFactory, IMissingWriterFactory nullWriterFactory, IUnnestingPositionWriterFactory unnestingPositionWriterFactory, INormalizedKeyComputerFactoryProvider normalizedKeyComputerFactoryProvider, @@ -106,6 +111,8 @@ public class JobGenContext { this.booleanInspectorFactory = booleanInspectorFactory; this.integerInspectorFactory = integerInspectorFactory; this.printerFactoryProvider = printerFactoryProvider; + this.writerFactory = writerFactory; + this.resultSerializerFactoryProvider = resultSerializerFactoryProvider; this.clusterLocations = clusterLocations; this.normalizedKeyComputerFactoryProvider = normalizedKeyComputerFactoryProvider; this.missingWriterFactory = missingWriterFactory; @@ -172,6 +179,14 @@ public class JobGenContext { return printerFactoryProvider; } + public IAWriterFactory getWriterFactory() { + return writerFactory; + } + + public IResultSerializerFactoryProvider getResultSerializerFactoryProvider() { + return resultSerializerFactoryProvider; + } + public IPredicateEvaluatorFactoryProvider getPredicateEvaluatorFactoryProvider() { return predEvaluatorFactoryProvider; } diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java index d1fd247818..bee842ce70 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/AlgebricksOptimizationContext.java @@ -46,45 +46,29 @@ import org.apache.hyracks.algebricks.core.algebra.properties.FunctionalDependenc import org.apache.hyracks.algebricks.core.algebra.properties.ILogicalPropertiesVector; import org.apache.hyracks.algebricks.core.algebra.properties.INodeDomain; import org.apache.hyracks.api.exceptions.IWarningCollector; +import org.apache.hyracks.api.exceptions.NoOpWarningCollector; /** * The Algebricks default implementation for IOptimizationContext. */ -@SuppressWarnings({ "unchecked", "rawtypes" }) +@SuppressWarnings({ "rawtypes" }) public class AlgebricksOptimizationContext implements IOptimizationContext { - private int varCounter; + private final IOptimizationContextFactory optContextFactory; private final IExpressionEvalSizeComputer expressionEvalSizeComputer; private final IMergeAggregationExpressionFactory mergeAggregationExpressionFactory; private final PhysicalOptimizationConfig physicalOptimizationConfig; - private final IVariableEvalSizeEnvironment varEvalSizeEnv = new IVariableEvalSizeEnvironment() { - - Map<LogicalVariable, Integer> varSizeMap = new HashMap<>(); - - @Override - public void setVariableEvalSize(LogicalVariable var, int size) { - varSizeMap.put(var, size); - } - - @Override - public int getVariableEvalSize(LogicalVariable var) { - return varSizeMap.get(var); - } - }; - - private Map<ILogicalOperator, IVariableTypeEnvironment> typeEnvMap = new HashMap<>(); - - private Map<ILogicalOperator, HashSet<ILogicalOperator>> alreadyCompared = new HashMap<>(); - private Map<IAlgebraicRewriteRule, HashSet<ILogicalOperator>> dontApply = new HashMap<>(); - private Map<LogicalVariable, FunctionalDependency> varToPrimaryKey = new HashMap<>(); - - private IMetadataProvider metadataProvider; - private HashSet<LogicalVariable> notToBeInlinedVars = new HashSet<>(); + private final VariableEvalSizeEnvironmentImpl varEvalSizeEnv = new VariableEvalSizeEnvironmentImpl(); + private final Map<ILogicalOperator, IVariableTypeEnvironment> typeEnvMap = new HashMap<>(); + private final Map<ILogicalOperator, HashSet<ILogicalOperator>> alreadyCompared = new HashMap<>(); + private final Map<IAlgebraicRewriteRule, HashSet<ILogicalOperator>> dontApply = new HashMap<>(); + private final Map<LogicalVariable, FunctionalDependency> varToPrimaryKey = new HashMap<>(); + private final HashSet<LogicalVariable> notToBeInlinedVars = new HashSet<>(); protected final Map<ILogicalOperator, List<FunctionalDependency>> fdGlobalMap = new HashMap<>(); protected final Map<ILogicalOperator, Map<LogicalVariable, EquivalenceClass>> eqClassGlobalMap = new HashMap<>(); - protected final Map<ILogicalOperator, ILogicalPropertiesVector> logicalProps = new HashMap<>(); + private final IExpressionTypeComputer expressionTypeComputer; private final IMissableTypeComputer nullableTypeComputer; private final INodeDomain defaultNodeDomain; @@ -94,12 +78,18 @@ public class AlgebricksOptimizationContext implements IOptimizationContext { private final PlanStructureVerifier planStructureVerifier; private final PlanStabilityVerifier planStabilityVerifier; - public AlgebricksOptimizationContext(int varCounter, IExpressionEvalSizeComputer expressionEvalSizeComputer, + private int varCounter; + private IMetadataProvider metadataProvider; + private Object compilerFactory; + + public AlgebricksOptimizationContext(IOptimizationContextFactory optContextFactory, int varCounter, + IExpressionEvalSizeComputer expressionEvalSizeComputer, IMergeAggregationExpressionFactory mergeAggregationExpressionFactory, IExpressionTypeComputer expressionTypeComputer, IMissableTypeComputer nullableTypeComputer, IConflictingTypeResolver conflictingTypeResovler, PhysicalOptimizationConfig physicalOptimizationConfig, AlgebricksPartitionConstraint clusterLocations, IPlanPrettyPrinter prettyPrinter, IWarningCollector warningCollector) { + this.optContextFactory = optContextFactory; this.varCounter = varCounter; this.expressionEvalSizeComputer = expressionEvalSizeComputer; this.mergeAggregationExpressionFactory = mergeAggregationExpressionFactory; @@ -115,6 +105,35 @@ public class AlgebricksOptimizationContext implements IOptimizationContext { this.planStabilityVerifier = isSanityCheckEnabled ? new PlanStabilityVerifier(prettyPrinter) : null; } + public AlgebricksOptimizationContext(AlgebricksOptimizationContext from) { + optContextFactory = from.optContextFactory; + varCounter = from.varCounter; + expressionEvalSizeComputer = from.expressionEvalSizeComputer; + mergeAggregationExpressionFactory = from.mergeAggregationExpressionFactory; + expressionTypeComputer = from.expressionTypeComputer; + nullableTypeComputer = from.nullableTypeComputer; + physicalOptimizationConfig = from.physicalOptimizationConfig; + defaultNodeDomain = from.defaultNodeDomain; + prettyPrinter = from.prettyPrinter; + conflictingTypeResovler = from.conflictingTypeResovler; + warningCollector = NoOpWarningCollector.INSTANCE; + boolean isSanityCheckEnabled = physicalOptimizationConfig.isSanityCheckEnabled(); + planStructureVerifier = isSanityCheckEnabled ? new PlanStructureVerifier(from.prettyPrinter, this) : null; + planStabilityVerifier = isSanityCheckEnabled ? new PlanStabilityVerifier(from.prettyPrinter) : null; + metadataProvider = from.metadataProvider; + compilerFactory = from.compilerFactory; + + varEvalSizeEnv.varSizeMap.putAll(from.varEvalSizeEnv.varSizeMap); + typeEnvMap.putAll(from.typeEnvMap); + alreadyCompared.putAll(from.alreadyCompared); + dontApply.putAll(from.dontApply); + varToPrimaryKey.putAll(from.varToPrimaryKey); + notToBeInlinedVars.addAll(from.notToBeInlinedVars); + fdGlobalMap.putAll(from.fdGlobalMap); + eqClassGlobalMap.putAll(from.eqClassGlobalMap); + logicalProps.putAll(from.logicalProps); + } + @Override public int getVarCounter() { return varCounter; @@ -354,4 +373,34 @@ public class AlgebricksOptimizationContext implements IOptimizationContext { public PlanStabilityVerifier getPlanStabilityVerifier() { return planStabilityVerifier; } + + @Override + public IOptimizationContextFactory getOptimizationContextFactory() { + return optContextFactory; + } + + @Override + public void setCompilerFactory(Object compilerFactory) { + this.compilerFactory = compilerFactory; + } + + @Override + public Object getCompilerFactory() { + return compilerFactory; + } + + protected static class VariableEvalSizeEnvironmentImpl implements IVariableEvalSizeEnvironment { + + protected final Map<LogicalVariable, Integer> varSizeMap = new HashMap<>(); + + @Override + public void setVariableEvalSize(LogicalVariable var, int size) { + varSizeMap.put(var, size); + } + + @Override + public int getVariableEvalSize(LogicalVariable var) { + return varSizeMap.get(var); + } + } } diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/IOptimizationContextFactory.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/IOptimizationContextFactory.java index 1c41e9aa42..c81025d82e 100644 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/IOptimizationContextFactory.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/IOptimizationContextFactory.java @@ -28,10 +28,12 @@ import org.apache.hyracks.algebricks.core.algebra.expressions.IMissableTypeCompu import org.apache.hyracks.api.exceptions.IWarningCollector; public interface IOptimizationContextFactory { - public IOptimizationContext createOptimizationContext(int varCounter, + IOptimizationContext createOptimizationContext(int varCounter, IExpressionEvalSizeComputer expressionEvalSizeComputer, IMergeAggregationExpressionFactory mergeAggregationExpressionFactory, IExpressionTypeComputer expressionTypeComputer, IMissableTypeComputer missableTypeComputer, IConflictingTypeResolver conflictintTypeResolver, PhysicalOptimizationConfig physicalOptimizationConfig, AlgebricksPartitionConstraint clusterLocations, IWarningCollector warningCollector); + + IOptimizationContext cloneOptimizationContext(IOptimizationContext oc); } diff --git a/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/ICompilerFactory.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/IRuleSetKind.java similarity index 70% copy from hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/ICompilerFactory.java copy to hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/IRuleSetKind.java index 7c138eaac8..5ce61ec5a0 100644 --- a/hyracks-fullstack/algebricks/algebricks-compiler/src/main/java/org/apache/hyracks/algebricks/compiler/api/ICompilerFactory.java +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/rewriter/base/IRuleSetKind.java @@ -16,11 +16,9 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.hyracks.algebricks.compiler.api; -import org.apache.hyracks.algebricks.core.algebra.base.ILogicalPlan; -import org.apache.hyracks.algebricks.core.algebra.metadata.IMetadataProvider; +package org.apache.hyracks.algebricks.core.rewriter.base; -public interface ICompilerFactory { - ICompiler createCompiler(ILogicalPlan plan, IMetadataProvider<?, ?> metadata, int varCounter); +public interface IRuleSetKind { + String name(); } diff --git a/hyracks-fullstack/algebricks/algebricks-data/src/main/java/org/apache/hyracks/algebricks/data/IResultSerializerFactoryProvider.java b/hyracks-fullstack/algebricks/algebricks-data/src/main/java/org/apache/hyracks/algebricks/data/IResultSerializerFactoryProvider.java index 7525180008..75fd791b25 100644 --- a/hyracks-fullstack/algebricks/algebricks-data/src/main/java/org/apache/hyracks/algebricks/data/IResultSerializerFactoryProvider.java +++ b/hyracks-fullstack/algebricks/algebricks-data/src/main/java/org/apache/hyracks/algebricks/data/IResultSerializerFactoryProvider.java @@ -32,10 +32,8 @@ public interface IResultSerializerFactoryProvider extends Serializable { * - A printer factory array to print the tuple containing different fields. * @param writerFactory * - A writer factory to write the serialized data to the print stream. - * @param inputRecordDesc - * - The record descriptor describing the input frame to be serialized. * @return A new instance of result serialized appender. */ - public IResultSerializerFactory getAqlResultSerializerFactoryProvider(int[] fields, - IPrinterFactory[] printerFactories, IAWriterFactory writerFactory); + public IResultSerializerFactory getResultSerializerFactoryProvider(int[] fields, IPrinterFactory[] printerFactories, + IAWriterFactory writerFactory); } diff --git a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PopulateResultMetadataRule.java b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PopulateResultMetadataRule.java index e1b2e5d013..1eda1339d4 100644 --- a/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PopulateResultMetadataRule.java +++ b/hyracks-fullstack/algebricks/algebricks-rewriter/src/main/java/org/apache/hyracks/algebricks/rewriter/rules/PopulateResultMetadataRule.java @@ -47,7 +47,7 @@ public final class PopulateResultMetadataRule implements IAlgebraicRewriteRule { } DistributeResultOperator dop = (DistributeResultOperator) op; IResultMetadata resultMetadata = dop.getResultMetadata(); - if (resultMetadata.getOutputTypes() != null) { + if (resultMetadata == null || resultMetadata.getOutputTypes() != null) { return false; } List<Mutable<ILogicalExpression>> exprList = dop.getExpressions(); diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/serializer/ResultSerializerFactoryProvider.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/serializer/ResultSerializerFactoryProvider.java index 763e6ff07c..90fa824b64 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/serializer/ResultSerializerFactoryProvider.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/serializer/ResultSerializerFactoryProvider.java @@ -40,7 +40,7 @@ public class ResultSerializerFactoryProvider implements IResultSerializerFactory } @Override - public IResultSerializerFactory getAqlResultSerializerFactoryProvider(final int[] fields, + public IResultSerializerFactory getResultSerializerFactoryProvider(final int[] fields, final IPrinterFactory[] printerFactories, final IAWriterFactory writerFactory) { return new IResultSerializerFactory() { private static final long serialVersionUID = 1L; diff --git a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/writers/SerializedDataWriterFactory.java b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/writers/SerializedDataWriterFactory.java index bc7634d7e3..8ce4f2bebc 100644 --- a/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/writers/SerializedDataWriterFactory.java +++ b/hyracks-fullstack/algebricks/algebricks-runtime/src/main/java/org/apache/hyracks/algebricks/runtime/writers/SerializedDataWriterFactory.java @@ -32,7 +32,15 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; public class SerializedDataWriterFactory implements IAWriterFactory { - private static final long serialVersionUID = 1L; + private static final long serialVersionUID = 2L; + + public static final SerializedDataWriterFactory WITHOUT_RECORD_DESCRIPTOR = new SerializedDataWriterFactory(false); + + private final boolean writeRecordDescriptor; + + public SerializedDataWriterFactory(boolean writeRecordDescriptor) { + this.writeRecordDescriptor = writeRecordDescriptor; + } @Override public IAWriter createWriter(final int[] fields, final PrintStream ps, IPrinterFactory[] printerFactories, @@ -41,15 +49,17 @@ public class SerializedDataWriterFactory implements IAWriterFactory { @Override public void init() throws HyracksDataException { - // dump the SerializerDeserializers to disk - try { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - ObjectOutputStream oos = new ObjectOutputStream(baos); - oos.writeObject(inputRecordDescriptor); - baos.writeTo(ps); - oos.close(); - } catch (IOException e) { - throw HyracksDataException.create(e); + if (writeRecordDescriptor) { + // dump the SerializerDeserializers to disk + try { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(baos); + oos.writeObject(inputRecordDescriptor); + baos.writeTo(ps); + oos.close(); + } catch (IOException e) { + throw HyracksDataException.create(e); + } } } diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/NoOpWarningCollector.java b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/NoOpWarningCollector.java similarity index 88% copy from asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/NoOpWarningCollector.java copy to hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/NoOpWarningCollector.java index a036b7ef93..caf24646f1 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/NoOpWarningCollector.java +++ b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/NoOpWarningCollector.java @@ -17,10 +17,7 @@ * under the License. */ -package org.apache.asterix.common.exceptions; - -import org.apache.hyracks.api.exceptions.IWarningCollector; -import org.apache.hyracks.api.exceptions.Warning; +package org.apache.hyracks.api.exceptions; public final class NoOpWarningCollector implements IWarningCollector { diff --git a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java index 3f78234a0c..71c81a802d 100644 --- a/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java +++ b/hyracks-fullstack/hyracks/hyracks-test-support/src/main/java/org/apache/hyracks/test/support/TestUtils.java @@ -38,7 +38,7 @@ import org.apache.hyracks.api.dataflow.TaskAttemptId; import org.apache.hyracks.api.dataflow.TaskId; import org.apache.hyracks.api.exceptions.HyracksException; import org.apache.hyracks.api.exceptions.IWarningCollector; -import org.apache.hyracks.api.exceptions.Warning; +import org.apache.hyracks.api.exceptions.NoOpWarningCollector; import org.apache.hyracks.api.io.IODeviceHandle; import org.apache.hyracks.api.job.JobId; import org.apache.hyracks.api.util.CleanupUtils; @@ -52,22 +52,7 @@ import org.apache.logging.log4j.core.config.Configuration; public class TestUtils { private static final int DEFAULT_FRAME_SIZE = 32768; - public static final IWarningCollector NOOP_WARNING_COLLECTOR = new IWarningCollector() { - @Override - public void warn(Warning warning) { - // no-op - } - - @Override - public boolean shouldWarn() { - return false; - } - - @Override - public long getTotalWarningsCount() { - return 0; - } - }; + public static final IWarningCollector NOOP_WARNING_COLLECTOR = NoOpWarningCollector.INSTANCE; public static IHyracksTaskContext createHyracksTask() { return create(DEFAULT_FRAME_SIZE);
