Repository: asterixdb Updated Branches: refs/heads/master 56189fb11 -> 910303b46
Enable commit runtime extension Change-Id: I98083ea5e93cb5f45d92c5dfbacfee1020fad57a Reviewed-on: https://asterix-gerrit.ics.uci.edu/1485 Tested-by: Jenkins <[email protected]> BAD: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Reviewed-by: abdullah alamoudi <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/910303b4 Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/910303b4 Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/910303b4 Branch: refs/heads/master Commit: 910303b4634f7ced6bde7dc2e6525db880975062 Parents: 56189fb Author: Abdullah Alamoudi <[email protected]> Authored: Fri Feb 3 23:02:25 2017 -0800 Committer: abdullah alamoudi <[email protected]> Committed: Sun Feb 5 09:24:38 2017 -0800 ---------------------------------------------------------------------- asterixdb/asterix-algebra/pom.xml | 4 - .../operators/physical/CommitPOperator.java | 30 ++- .../operators/physical/CommitRuntime.java | 187 ------------------ .../physical/CommitRuntimeFactory.java | 69 ------- .../operators/physical/UpsertCommitRuntime.java | 54 ------ .../rules/SetupCommitExtensionOpRule.java | 34 ++-- .../app/bootstrap/TestNodeController.java | 2 +- .../external/library/java/JObjectAccessors.java | 27 ++- .../asterix/metadata/entities/Dataset.java | 25 +-- .../printers/adm/ABooleanPrinterFactory.java | 4 +- .../adm/ShortWithoutTypeInfoPrinterFactory.java | 4 +- .../printers/csv/ABooleanPrinterFactory.java | 4 +- .../json/clean/ABooleanPrinterFactory.java | 4 +- .../json/lossless/ABooleanPrinterFactory.java | 4 +- .../serde/ABooleanSerializerDeserializer.java | 8 +- .../evaluators/common/GramTokensEvaluator.java | 4 +- .../evaluators/functions/AndDescriptor.java | 14 +- ...EditDistanceStringIsFilterableEvaluator.java | 18 +- .../evaluators/functions/NotDescriptor.java | 8 +- .../evaluators/functions/OrDescriptor.java | 14 +- asterixdb/asterix-transactions/pom.xml | 21 ++- .../management/runtime/CommitRuntime.java | 188 +++++++++++++++++++ .../runtime/CommitRuntimeFactory.java | 69 +++++++ .../management/runtime/UpsertCommitRuntime.java | 54 ++++++ .../data/std/primitive/BooleanPointable.java | 2 +- 25 files changed, 416 insertions(+), 436 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/910303b4/asterixdb/asterix-algebra/pom.xml ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-algebra/pom.xml b/asterixdb/asterix-algebra/pom.xml index 8588381..5a723b3 100644 --- a/asterixdb/asterix-algebra/pom.xml +++ b/asterixdb/asterix-algebra/pom.xml @@ -226,10 +226,6 @@ </dependency> <dependency> <groupId>org.apache.hyracks</groupId> - <artifactId>hyracks-storage-am-bloomfilter</artifactId> - </dependency> - <dependency> - <groupId>org.apache.hyracks</groupId> <artifactId>algebricks-runtime</artifactId> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/asterixdb/blob/910303b4/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java index 1c01c40..d0cee55 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitPOperator.java @@ -23,6 +23,7 @@ import java.util.List; import org.apache.asterix.common.transactions.JobId; import org.apache.asterix.metadata.declared.MetadataProvider; +import org.apache.asterix.metadata.entities.Dataset; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.core.algebra.base.IHyracksJobBuilder; import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; @@ -36,6 +37,7 @@ import org.apache.hyracks.algebricks.core.algebra.properties.IPhysicalProperties import org.apache.hyracks.algebricks.core.algebra.properties.PhysicalRequirements; import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext; import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenHelper; +import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory; import org.apache.hyracks.api.dataflow.value.RecordDescriptor; import org.apache.hyracks.api.io.FileSplit; @@ -43,20 +45,16 @@ public class CommitPOperator extends AbstractPhysicalOperator { private final List<LogicalVariable> primaryKeyLogicalVars; private final JobId jobId; - private final int datasetId; - private final String dataverse; - private final String dataset; + private final Dataset dataset; private final LogicalVariable upsertVar; private final boolean isSink; - public CommitPOperator(JobId jobId, String dataverse, String dataset, int datasetId, - List<LogicalVariable> primaryKeyLogicalVars, LogicalVariable upsertVar, boolean isSink) { + public CommitPOperator(JobId jobId, Dataset dataset, List<LogicalVariable> primaryKeyLogicalVars, + LogicalVariable upsertVar, boolean isSink) { this.jobId = jobId; - this.datasetId = datasetId; + this.dataset = dataset; this.primaryKeyLogicalVars = primaryKeyLogicalVars; this.upsertVar = upsertVar; - this.dataverse = dataverse; - this.dataset = dataset; this.isSink = isSink; } @@ -86,28 +84,26 @@ public class CommitPOperator extends AbstractPhysicalOperator { @Override public void contributeRuntimeOperator(IHyracksJobBuilder builder, JobGenContext context, ILogicalOperator op, IOperatorSchema propagatedSchema, IOperatorSchema[] inputSchemas, IOperatorSchema outerPlanSchema) - throws AlgebricksException { + throws AlgebricksException { MetadataProvider metadataProvider = (MetadataProvider) context.getMetadataProvider(); - RecordDescriptor recDesc = JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema, - context); + RecordDescriptor recDesc = + JobGenHelper.mkRecordDescriptor(context.getTypeEnvironment(op), propagatedSchema, context); int[] primaryKeyFields = JobGenHelper.variablesToFieldIndexes(primaryKeyLogicalVars, inputSchemas[0]); //get dataset splits FileSplit[] splitsForDataset = metadataProvider.splitsForDataset(metadataProvider.getMetadataTxnContext(), - dataverse, dataset, dataset, metadataProvider.isTemporaryDatasetWriteJob()); + dataset.getDataverseName(), dataset.getDatasetName(), dataset.getDatasetName(), + metadataProvider.isTemporaryDatasetWriteJob()); int[] datasetPartitions = new int[splitsForDataset.length]; for (int i = 0; i < splitsForDataset.length; i++) { datasetPartitions[i] = i; } - int upsertVarIdx = -1; - CommitRuntimeFactory runtime = null; if (upsertVar != null) { upsertVarIdx = inputSchemas[0].findVariable(upsertVar); } - runtime = new CommitRuntimeFactory(jobId, datasetId, primaryKeyFields, - metadataProvider.isTemporaryDatasetWriteJob(), metadataProvider.isWriteTransaction(), upsertVarIdx, - datasetPartitions, isSink); + IPushRuntimeFactory runtime = dataset.getCommitRuntimeFactory(jobId, primaryKeyFields, metadataProvider, + upsertVarIdx, datasetPartitions, isSink); builder.contributeMicroOperator(op, runtime, recDesc); ILogicalOperator src = op.getInputs().get(0).getValue(); builder.contributeGraphEdge(src, 0, op, 0); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/910303b4/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntime.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntime.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntime.java deleted file mode 100644 index 63a91ac..0000000 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntime.java +++ /dev/null @@ -1,187 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.asterix.algebra.operators.physical; - -import java.nio.ByteBuffer; - -import org.apache.asterix.common.api.IAppRuntimeContext; -import org.apache.asterix.common.exceptions.ACIDException; -import org.apache.asterix.common.transactions.ILogManager; -import org.apache.asterix.common.transactions.ILogMarkerCallback; -import org.apache.asterix.common.transactions.ITransactionContext; -import org.apache.asterix.common.transactions.ITransactionManager; -import org.apache.asterix.common.transactions.JobId; -import org.apache.asterix.common.transactions.LogRecord; -import org.apache.asterix.common.transactions.LogType; -import org.apache.asterix.common.utils.TransactionUtil; -import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime; -import org.apache.hyracks.api.comm.VSizeFrame; -import org.apache.hyracks.api.context.IHyracksTaskContext; -import org.apache.hyracks.api.dataflow.value.RecordDescriptor; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.api.util.HyracksConstants; -import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; -import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference; -import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; -import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender; -import org.apache.hyracks.dataflow.common.utils.TaskUtil; -import org.apache.hyracks.storage.am.bloomfilter.impls.MurmurHash128Bit; - -public class CommitRuntime extends AbstractOneInputOneOutputOneFramePushRuntime { - - private final static long SEED = 0L; - - protected final ITransactionManager transactionManager; - protected final ILogManager logMgr; - protected final JobId jobId; - protected final int datasetId; - protected final int[] primaryKeyFields; - protected final boolean isTemporaryDatasetWriteJob; - protected final boolean isWriteTransaction; - protected final long[] longHashes; - protected final IHyracksTaskContext ctx; - protected final int resourcePartition; - protected ITransactionContext transactionContext; - protected LogRecord logRecord; - protected final boolean isSink; - - public CommitRuntime(IHyracksTaskContext ctx, JobId jobId, int datasetId, int[] primaryKeyFields, - boolean isTemporaryDatasetWriteJob, boolean isWriteTransaction, int resourcePartition, boolean isSink) { - this.ctx = ctx; - IAppRuntimeContext runtimeCtx = - (IAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject(); - this.transactionManager = runtimeCtx.getTransactionSubsystem().getTransactionManager(); - this.logMgr = runtimeCtx.getTransactionSubsystem().getLogManager(); - this.jobId = jobId; - this.datasetId = datasetId; - this.primaryKeyFields = primaryKeyFields; - this.tRef = new FrameTupleReference(); - this.isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob; - this.isWriteTransaction = isWriteTransaction; - this.resourcePartition = resourcePartition; - this.isSink = isSink; - longHashes = new long[2]; - } - - @Override - public void open() throws HyracksDataException { - try { - transactionContext = transactionManager.getTransactionContext(jobId, false); - transactionContext.setWriteTxn(isWriteTransaction); - ILogMarkerCallback callback = - TaskUtil.<ILogMarkerCallback> get(ILogMarkerCallback.KEY_MARKER_CALLBACK, ctx); - logRecord = new LogRecord(callback); - if (isSink) { - return; - } - initAccessAppend(ctx); - writer.open(); - } catch (ACIDException e) { - throw new HyracksDataException(e); - } - } - - @Override - public void nextFrame(ByteBuffer buffer) throws HyracksDataException { - tAccess.reset(buffer); - int nTuple = tAccess.getTupleCount(); - for (int t = 0; t < nTuple; t++) { - if (isTemporaryDatasetWriteJob) { - /** - * This "if branch" is for writes over temporary datasets. A temporary dataset does not require any lock - * and does not generate any write-ahead update and commit log but generates flush log and job commit - * log. However, a temporary dataset still MUST guarantee no-steal policy so that this notification call - * should be delivered to PrimaryIndexOptracker and used correctly in order to decrement number of - * active operation count of PrimaryIndexOptracker. By maintaining the count correctly and only allowing - * flushing when the count is 0, it can guarantee the no-steal policy for temporary datasets, too. - */ - transactionContext.notifyOptracker(false); - } else { - tRef.reset(tAccess, t); - try { - formLogRecord(buffer, t); - logMgr.log(logRecord); - if (!isSink) { - appendTupleToFrame(t); - } - } catch (ACIDException e) { - throw new HyracksDataException(e); - } - } - } - VSizeFrame message = TaskUtil.<VSizeFrame> get(HyracksConstants.KEY_MESSAGE, ctx); - if (message != null - && MessagingFrameTupleAppender.getMessageType(message) == MessagingFrameTupleAppender.MARKER_MESSAGE) { - try { - formMarkerLogRecords(message.getBuffer()); - logMgr.log(logRecord); - } catch (ACIDException e) { - throw new HyracksDataException(e); - } - message.reset(); - message.getBuffer().put(MessagingFrameTupleAppender.NULL_FEED_MESSAGE); - message.getBuffer().flip(); - } - } - - private void formMarkerLogRecords(ByteBuffer marker) { - TransactionUtil.formMarkerLogRecord(logRecord, transactionContext, datasetId, resourcePartition, marker); - } - - protected void formLogRecord(ByteBuffer buffer, int t) { - int pkHash = computePrimaryKeyHashValue(tRef, primaryKeyFields); - TransactionUtil.formEntityCommitLogRecord(logRecord, transactionContext, datasetId, pkHash, tRef, - primaryKeyFields, resourcePartition, LogType.ENTITY_COMMIT); - } - - protected int computePrimaryKeyHashValue(ITupleReference tuple, int[] primaryKeyFields) { - MurmurHash128Bit.hash3_x64_128(tuple, primaryKeyFields, SEED, longHashes); - return Math.abs((int) longHashes[0]); - } - - @Override - public void fail() throws HyracksDataException { - failed = true; - if (isSink) { - return; - } - writer.fail(); - } - - @Override - public void close() throws HyracksDataException { - if (isSink) { - return; - } - flushIfNotFailed(); - writer.close(); - appender.reset(frame, true); - } - - @Override - public void setInputRecordDescriptor(int index, RecordDescriptor recordDescriptor) { - this.inputRecordDesc = recordDescriptor; - this.tAccess = new FrameTupleAccessor(inputRecordDesc); - } - - @Override - public void flush() throws HyracksDataException { - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/910303b4/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntimeFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntimeFactory.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntimeFactory.java deleted file mode 100644 index 767d864..0000000 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/CommitRuntimeFactory.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.asterix.algebra.operators.physical; - -import org.apache.asterix.common.transactions.JobId; -import org.apache.hyracks.algebricks.runtime.base.IPushRuntime; -import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory; -import org.apache.hyracks.api.context.IHyracksTaskContext; -import org.apache.hyracks.api.exceptions.HyracksDataException; - -public class CommitRuntimeFactory implements IPushRuntimeFactory { - - private static final long serialVersionUID = 1L; - - private final JobId jobId; - private final int datasetId; - private final int[] primaryKeyFields; - private final boolean isTemporaryDatasetWriteJob; - private final boolean isWriteTransaction; - private final int upsertVarIdx; - private int[] datasetPartitions; - private final boolean isSink; - - public CommitRuntimeFactory(JobId jobId, int datasetId, int[] primaryKeyFields, boolean isTemporaryDatasetWriteJob, - boolean isWriteTransaction, int upsertVarIdx, int[] datasetPartitions, boolean isSink) { - this.jobId = jobId; - this.datasetId = datasetId; - this.primaryKeyFields = primaryKeyFields; - this.isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob; - this.isWriteTransaction = isWriteTransaction; - this.upsertVarIdx = upsertVarIdx; - this.datasetPartitions = datasetPartitions; - this.isSink = isSink; - } - - @Override - public String toString() { - return "commit"; - } - - @Override - public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) throws HyracksDataException { - if (upsertVarIdx >= 0) { - return new UpsertCommitRuntime(ctx, jobId, datasetId, primaryKeyFields, isTemporaryDatasetWriteJob, - isWriteTransaction, datasetPartitions[ctx.getTaskAttemptId().getTaskId().getPartition()], - upsertVarIdx, isSink); - } else { - return new CommitRuntime(ctx, jobId, datasetId, primaryKeyFields, isTemporaryDatasetWriteJob, - isWriteTransaction, datasetPartitions[ctx.getTaskAttemptId().getTaskId().getPartition()], isSink); - } - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/910303b4/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/UpsertCommitRuntime.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/UpsertCommitRuntime.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/UpsertCommitRuntime.java deleted file mode 100644 index 53e0f62..0000000 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/algebra/operators/physical/UpsertCommitRuntime.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.algebra.operators.physical; - -import java.nio.ByteBuffer; - -import org.apache.asterix.common.transactions.JobId; -import org.apache.asterix.common.transactions.LogType; -import org.apache.asterix.common.utils.TransactionUtil; -import org.apache.asterix.dataflow.data.nontagged.serde.ABooleanSerializerDeserializer; -import org.apache.hyracks.api.context.IHyracksTaskContext; - -public class UpsertCommitRuntime extends CommitRuntime { - private final int upsertIdx; - - public UpsertCommitRuntime(IHyracksTaskContext ctx, JobId jobId, int datasetId, int[] primaryKeyFields, - boolean isTemporaryDatasetWriteJob, boolean isWriteTransaction, int resourcePartition, int upsertIdx, - boolean isSink) { - super(ctx, jobId, datasetId, primaryKeyFields, isTemporaryDatasetWriteJob, isWriteTransaction, - resourcePartition, isSink); - this.upsertIdx = upsertIdx; - } - - @Override - protected void formLogRecord(ByteBuffer buffer, int t) { - boolean isNull = ABooleanSerializerDeserializer.getBoolean(buffer.array(), tAccess.getFieldSlotsLength() - + tAccess.getTupleStartOffset(t) + tAccess.getFieldStartOffset(t, upsertIdx) + 1); - if (isNull) { - // Previous record not found (insert) - super.formLogRecord(buffer, t); - } else { - // Previous record found (delete + insert) - int pkHash = computePrimaryKeyHashValue(tRef, primaryKeyFields); - TransactionUtil.formEntityCommitLogRecord(logRecord, transactionContext, datasetId, pkHash, tRef, - primaryKeyFields, resourcePartition, LogType.UPSERT_ENTITY_COMMIT); - } - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/910303b4/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetupCommitExtensionOpRule.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetupCommitExtensionOpRule.java b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetupCommitExtensionOpRule.java index 5bafe83..9b442ae 100644 --- a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetupCommitExtensionOpRule.java +++ b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/SetupCommitExtensionOpRule.java @@ -27,6 +27,7 @@ import org.apache.asterix.common.transactions.JobId; import org.apache.asterix.lang.common.util.FunctionUtil; import org.apache.asterix.metadata.declared.DatasetDataSource; import org.apache.asterix.metadata.declared.MetadataProvider; +import org.apache.asterix.metadata.entities.Dataset; import org.apache.asterix.om.functions.BuiltinFunctions; import org.apache.commons.lang3.mutable.Mutable; import org.apache.commons.lang3.mutable.MutableObject; @@ -70,9 +71,7 @@ public class SetupCommitExtensionOpRule implements IAlgebraicRewriteRule { boolean isSink = ((CommitOperator) eOp.getDelegate()).isSink(); List<Mutable<ILogicalExpression>> primaryKeyExprs = null; - int datasetId = 0; - String dataverse = null; - String datasetName = null; + Dataset dataset = null; AbstractLogicalOperator descendantOp = (AbstractLogicalOperator) eOp.getInputs().get(0).getValue(); LogicalVariable upsertVar = null; while (descendantOp != null) { @@ -80,29 +79,19 @@ public class SetupCommitExtensionOpRule implements IAlgebraicRewriteRule { IndexInsertDeleteUpsertOperator operator = (IndexInsertDeleteUpsertOperator) descendantOp; if (!operator.isBulkload() && operator.getPrevSecondaryKeyExprs() == null) { primaryKeyExprs = operator.getPrimaryKeyExpressions(); - datasetId = ((DatasetDataSource) operator.getDataSourceIndex().getDataSource()).getDataset() - .getDatasetId(); - dataverse = ((DatasetDataSource) operator.getDataSourceIndex().getDataSource()).getDataset() - .getDataverseName(); - datasetName = ((DatasetDataSource) operator.getDataSourceIndex().getDataSource()).getDataset() - .getDatasetName(); + dataset = ((DatasetDataSource) operator.getDataSourceIndex().getDataSource()).getDataset(); break; } } else if (descendantOp.getOperatorTag() == LogicalOperatorTag.INSERT_DELETE_UPSERT) { InsertDeleteUpsertOperator insertDeleteUpsertOperator = (InsertDeleteUpsertOperator) descendantOp; if (!insertDeleteUpsertOperator.isBulkload()) { primaryKeyExprs = insertDeleteUpsertOperator.getPrimaryKeyExpressions(); - datasetId = ((DatasetDataSource) insertDeleteUpsertOperator.getDataSource()).getDataset() - .getDatasetId(); - dataverse = ((DatasetDataSource) insertDeleteUpsertOperator.getDataSource()).getDataset() - .getDataverseName(); - datasetName = ((DatasetDataSource) insertDeleteUpsertOperator.getDataSource()).getDataset() - .getDatasetName(); + dataset = ((DatasetDataSource) insertDeleteUpsertOperator.getDataSource()).getDataset(); if (insertDeleteUpsertOperator.getOperation() == Kind.UPSERT) { //we need to add a function that checks if previous record was found upsertVar = context.newVar(); - AbstractFunctionCallExpression orFunc = new ScalarFunctionCallExpression( - FunctionUtil.getFunctionInfo(BuiltinFunctions.OR)); + AbstractFunctionCallExpression orFunc = + new ScalarFunctionCallExpression(FunctionUtil.getFunctionInfo(BuiltinFunctions.OR)); // is new value missing? -> this means that the expected operation is delete AbstractFunctionCallExpression isNewMissingFunc = new ScalarFunctionCallExpression( FunctionUtil.getFunctionInfo(BuiltinFunctions.IS_MISSING)); @@ -116,11 +105,10 @@ public class SetupCommitExtensionOpRule implements IAlgebraicRewriteRule { orFunc.getArguments().add(new MutableObject<ILogicalExpression>(isNewMissingFunc)); // AssignOperator puts in the cast var the casted record - AssignOperator upsertFlagAssign = new AssignOperator(upsertVar, - new MutableObject<ILogicalExpression>(orFunc)); + AssignOperator upsertFlagAssign = + new AssignOperator(upsertVar, new MutableObject<ILogicalExpression>(orFunc)); // Connect the current top of the plan to the cast operator - upsertFlagAssign.getInputs() - .add(new MutableObject<ILogicalOperator>(eOp.getInputs().get(0).getValue())); + upsertFlagAssign.getInputs().add(new MutableObject<>(eOp.getInputs().get(0).getValue())); eOp.getInputs().clear(); eOp.getInputs().add(new MutableObject<ILogicalOperator>(upsertFlagAssign)); context.computeAndSetTypeEnvironmentForOperator(upsertFlagAssign); @@ -151,8 +139,8 @@ public class SetupCommitExtensionOpRule implements IAlgebraicRewriteRule { //create the logical and physical operator CommitOperator commitOperator = new CommitOperator(primaryKeyLogicalVars, upsertVar, isSink); - CommitPOperator commitPOperator = new CommitPOperator(jobId, dataverse, datasetName, datasetId, - primaryKeyLogicalVars, upsertVar, isSink); + CommitPOperator commitPOperator = + new CommitPOperator(jobId, dataset, primaryKeyLogicalVars, upsertVar, isSink); commitOperator.setPhysicalOperator(commitPOperator); //create ExtensionOperator and put the commitOperator in it. http://git-wip-us.apache.org/repos/asf/asterixdb/blob/910303b4/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java index bc18045..cc12f36 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java @@ -25,7 +25,6 @@ import java.util.List; import java.util.Map; import java.util.logging.Logger; -import org.apache.asterix.algebra.operators.physical.CommitRuntime; import org.apache.asterix.app.external.TestLibrarian; import org.apache.asterix.app.nc.NCAppRuntimeContext; import org.apache.asterix.app.nc.TransactionSubsystem; @@ -57,6 +56,7 @@ import org.apache.asterix.test.runtime.ExecutionTestUtil; import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexModificationOperationCallbackFactory; import org.apache.asterix.transaction.management.resource.LSMBTreeLocalResourceMetadataFactory; import org.apache.asterix.transaction.management.resource.PersistentLocalResourceFactoryProvider; +import org.apache.asterix.transaction.management.runtime.CommitRuntime; import org.apache.asterix.transaction.management.service.logging.LogReader; import org.apache.commons.lang3.StringUtils; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/910303b4/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectAccessors.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectAccessors.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectAccessors.java index 1f1c139..f8755b4 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectAccessors.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/library/java/JObjectAccessors.java @@ -18,6 +18,12 @@ */ package org.apache.asterix.external.library.java; +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; +import java.util.LinkedHashMap; +import java.util.List; + import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.common.exceptions.RuntimeDataException; @@ -87,12 +93,6 @@ import org.apache.asterix.om.util.container.IObjectPool; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.util.string.UTF8StringReader; -import java.io.ByteArrayInputStream; -import java.io.DataInputStream; -import java.io.IOException; -import java.util.LinkedHashMap; -import java.util.List; - public class JObjectAccessors { public static IJObjectAccessor createFlatJObjectAccessor(ATypeTag aTypeTag) { @@ -455,7 +455,7 @@ public class JObjectAccessors { this.typeInfo = new TypeInfo(objectPool, null, null); this.jObjects = new IJObject[recordType.getFieldNames().length]; this.jRecord = new JRecord(recordType, jObjects); - this.openFields = new LinkedHashMap<String, IJObject>(); + this.openFields = new LinkedHashMap<>(); } @Override @@ -473,12 +473,11 @@ public class JObjectAccessors { for (IVisitablePointable fieldPointable : fieldPointables) { closedPart = index < recordType.getFieldTypes().length; IVisitablePointable tt = fieldTypeTags.get(index); - ATypeTag typeTag = EnumDeserializer.ATYPETAGDESERIALIZER - .deserialize(tt.getByteArray()[tt.getStartOffset()]); + ATypeTag typeTag = + EnumDeserializer.ATYPETAGDESERIALIZER.deserialize(tt.getByteArray()[tt.getStartOffset()]); IAType fieldType; - fieldType = closedPart ? - recordType.getFieldTypes()[index] : - TypeTagUtil.getBuiltinTypeByTag(typeTag); + fieldType = + closedPart ? recordType.getFieldTypes()[index] : TypeTagUtil.getBuiltinTypeByTag(typeTag); IVisitablePointable fieldName = fieldNames.get(index); typeInfo.reset(fieldType, typeTag); switch (typeTag) { @@ -491,8 +490,8 @@ public class JObjectAccessors { // value is null fieldObject = null; } else { - fieldObject = pointableVisitor - .visit((AListVisitablePointable) fieldPointable, typeInfo); + fieldObject = + pointableVisitor.visit((AListVisitablePointable) fieldPointable, typeInfo); } break; case ANY: http://git-wip-us.apache.org/repos/asf/asterixdb/blob/910303b4/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java index 4ebf055..55cd304 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java @@ -22,6 +22,7 @@ package org.apache.asterix.metadata.entities; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.logging.Level; import java.util.logging.Logger; @@ -66,11 +67,13 @@ import org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexOpera import org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexSearchOperationCallbackFactory; import org.apache.asterix.transaction.management.opcallbacks.TempDatasetSecondaryIndexModificationOperationCallbackFactory; import org.apache.asterix.transaction.management.opcallbacks.UpsertOperationCallbackFactory; +import org.apache.asterix.transaction.management.runtime.CommitRuntimeFactory; import org.apache.commons.lang3.mutable.MutableBoolean; import org.apache.commons.lang3.mutable.MutableObject; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.algebricks.common.utils.Pair; import org.apache.hyracks.algebricks.core.algebra.base.ILogicalOperator; +import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory; import org.apache.hyracks.api.client.IHyracksClientConnection; import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; import org.apache.hyracks.api.dataflow.value.ITypeTraits; @@ -241,13 +244,8 @@ public class Dataset implements IMetadataEntity<Dataset> { return false; } Dataset otherDataset = (Dataset) other; - if (!otherDataset.dataverseName.equals(dataverseName)) { - return false; - } - if (!otherDataset.datasetName.equals(datasetName)) { - return false; - } - return true; + return Objects.equals(dataverseName, otherDataset.dataverseName) + && Objects.equals(datasetName, otherDataset.datasetName); } public boolean allow(ILogicalOperator topOp, byte operation) {//NOSONAR: this method is meant to be extended @@ -567,10 +565,13 @@ public class Dataset implements IMetadataEntity<Dataset> { @Override public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + ((datasetName == null) ? 0 : datasetName.hashCode()); - result = prime * result + ((dataverseName == null) ? 0 : dataverseName.hashCode()); - return result; + return Objects.hash(dataverseName, datasetName); + } + + public IPushRuntimeFactory getCommitRuntimeFactory(JobId jobId, int[] primaryKeyFields, + MetadataProvider metadataProvider, int upsertVarIdx, int[] datasetPartitions, boolean isSink) { + return new CommitRuntimeFactory(jobId, datasetId, primaryKeyFields, + metadataProvider.isTemporaryDatasetWriteJob(), metadataProvider.isWriteTransaction(), upsertVarIdx, + datasetPartitions, isSink); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/910303b4/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/adm/ABooleanPrinterFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/adm/ABooleanPrinterFactory.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/adm/ABooleanPrinterFactory.java index 511ea9f..0a2f166 100644 --- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/adm/ABooleanPrinterFactory.java +++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/adm/ABooleanPrinterFactory.java @@ -29,8 +29,8 @@ public class ABooleanPrinterFactory implements IPrinterFactory { private static final long serialVersionUID = 1L; public static final ABooleanPrinterFactory INSTANCE = new ABooleanPrinterFactory(); - public static final IPrinter PRINTER = (byte[] b, int s, int l, PrintStream ps) -> ps - .print(ABooleanSerializerDeserializer.getBoolean(b, s + 1)); + public static final IPrinter PRINTER = + (byte[] b, int s, int l, PrintStream ps) -> ps.print(ABooleanSerializerDeserializer.getBoolean(b, s + 1)); @Override public IPrinter createPrinter() { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/910303b4/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/adm/ShortWithoutTypeInfoPrinterFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/adm/ShortWithoutTypeInfoPrinterFactory.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/adm/ShortWithoutTypeInfoPrinterFactory.java index 666fa0a..2a878ac 100644 --- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/adm/ShortWithoutTypeInfoPrinterFactory.java +++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/adm/ShortWithoutTypeInfoPrinterFactory.java @@ -29,8 +29,8 @@ public class ShortWithoutTypeInfoPrinterFactory implements IPrinterFactory { private static final long serialVersionUID = 1L; public static final ShortWithoutTypeInfoPrinterFactory INSTANCE = new ShortWithoutTypeInfoPrinterFactory(); - public static final IPrinter PRINTER = (byte[] b, int s, int l, PrintStream ps) -> ps - .print(ABooleanSerializerDeserializer.getBoolean(b, s + 1)); + public static final IPrinter PRINTER = + (byte[] b, int s, int l, PrintStream ps) -> ps.print(ABooleanSerializerDeserializer.getBoolean(b, s + 1)); @Override public IPrinter createPrinter() { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/910303b4/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/csv/ABooleanPrinterFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/csv/ABooleanPrinterFactory.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/csv/ABooleanPrinterFactory.java index 4aa6ccd..c500e86 100644 --- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/csv/ABooleanPrinterFactory.java +++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/csv/ABooleanPrinterFactory.java @@ -29,8 +29,8 @@ public class ABooleanPrinterFactory implements IPrinterFactory { private static final long serialVersionUID = 1L; public static final ABooleanPrinterFactory INSTANCE = new ABooleanPrinterFactory(); - public static final IPrinter PRINTER = (byte[] b, int s, int l, PrintStream ps) -> ps - .print(ABooleanSerializerDeserializer.getBoolean(b, s + 1)); + public static final IPrinter PRINTER = + (byte[] b, int s, int l, PrintStream ps) -> ps.print(ABooleanSerializerDeserializer.getBoolean(b, s + 1)); @Override public IPrinter createPrinter() { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/910303b4/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/json/clean/ABooleanPrinterFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/json/clean/ABooleanPrinterFactory.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/json/clean/ABooleanPrinterFactory.java index ebc09a0..aa6fcbe 100644 --- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/json/clean/ABooleanPrinterFactory.java +++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/json/clean/ABooleanPrinterFactory.java @@ -29,8 +29,8 @@ public class ABooleanPrinterFactory implements IPrinterFactory { private static final long serialVersionUID = 1L; public static final ABooleanPrinterFactory INSTANCE = new ABooleanPrinterFactory(); - public static final IPrinter PRINTER = (byte[] b, int s, int l, PrintStream ps) -> ps - .print(ABooleanSerializerDeserializer.getBoolean(b, s + 1)); + public static final IPrinter PRINTER = + (byte[] b, int s, int l, PrintStream ps) -> ps.print(ABooleanSerializerDeserializer.getBoolean(b, s + 1)); @Override public IPrinter createPrinter() { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/910303b4/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/json/lossless/ABooleanPrinterFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/json/lossless/ABooleanPrinterFactory.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/json/lossless/ABooleanPrinterFactory.java index b65897b..959c4ad 100644 --- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/json/lossless/ABooleanPrinterFactory.java +++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/printers/json/lossless/ABooleanPrinterFactory.java @@ -29,8 +29,8 @@ public class ABooleanPrinterFactory implements IPrinterFactory { private static final long serialVersionUID = 1L; public static final ABooleanPrinterFactory INSTANCE = new ABooleanPrinterFactory(); - public static final IPrinter PRINTER = (byte[] b, int s, int l, PrintStream ps) -> - ps.print(ABooleanSerializerDeserializer.getBoolean(b, s + 1)); + public static final IPrinter PRINTER = + (byte[] b, int s, int l, PrintStream ps) -> ps.print(ABooleanSerializerDeserializer.getBoolean(b, s + 1)); @Override public IPrinter createPrinter() { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/910303b4/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/ABooleanSerializerDeserializer.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/ABooleanSerializerDeserializer.java b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/ABooleanSerializerDeserializer.java index 227c2cd..7d6a078 100644 --- a/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/ABooleanSerializerDeserializer.java +++ b/asterixdb/asterix-om/src/main/java/org/apache/asterix/dataflow/data/nontagged/serde/ABooleanSerializerDeserializer.java @@ -29,7 +29,6 @@ import org.apache.hyracks.api.exceptions.HyracksDataException; public class ABooleanSerializerDeserializer implements ISerializerDeserializer<ABoolean> { private static final long serialVersionUID = 1L; - public static final ABooleanSerializerDeserializer INSTANCE = new ABooleanSerializerDeserializer(); private ABooleanSerializerDeserializer() { @@ -54,11 +53,6 @@ public class ABooleanSerializerDeserializer implements ISerializerDeserializer<A } public static boolean getBoolean(byte[] bytes, int offset) { - if (bytes[offset] == 0) { - return false; - } else { - return true; - } + return bytes[offset] != 0; } - } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/910303b4/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/common/GramTokensEvaluator.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/common/GramTokensEvaluator.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/common/GramTokensEvaluator.java index d9cfc67..ef727c9 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/common/GramTokensEvaluator.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/common/GramTokensEvaluator.java @@ -22,6 +22,7 @@ import java.io.DataOutput; import java.io.IOException; import org.apache.asterix.builders.OrderedListBuilder; +import org.apache.asterix.dataflow.data.nontagged.serde.ABooleanSerializerDeserializer; import org.apache.asterix.om.functions.BuiltinFunctions; import org.apache.asterix.om.types.AOrderedListType; import org.apache.asterix.om.types.BuiltinType; @@ -31,7 +32,6 @@ import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.data.std.api.IPointable; -import org.apache.hyracks.data.std.primitive.BooleanPointable; import org.apache.hyracks.data.std.primitive.VoidPointable; import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference; @@ -75,7 +75,7 @@ public class GramTokensEvaluator implements IScalarEvaluator { int gramLength = ATypeHierarchy.getIntegerValue(BuiltinFunctions.GRAM_TOKENS.getName(), 1, gramLengthArg.getByteArray(), gramLengthArg.getStartOffset()); tokenizer.setGramlength(gramLength); - boolean prePost = BooleanPointable.getBoolean(prePostArg.getByteArray(), + boolean prePost = ABooleanSerializerDeserializer.getBoolean(prePostArg.getByteArray(), prePostArg.getStartOffset() + typeIndicatorSize); tokenizer.setPrePost(prePost); tokenizer.reset(stringArg.getByteArray(), stringArg.getStartOffset(), stringArg.getLength()); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/910303b4/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AndDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AndDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AndDescriptor.java index 9fd5dc4..e9f9c9e 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AndDescriptor.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/AndDescriptor.java @@ -74,14 +74,14 @@ public class AndDescriptor extends AbstractScalarFunctionDynamicDescriptor { return new IScalarEvaluator() { @SuppressWarnings("unchecked") - private ISerializerDeserializer<ABoolean> booleanSerde = SerializerDeserializerProvider.INSTANCE - .getSerializerDeserializer(BuiltinType.ABOOLEAN); + private ISerializerDeserializer<ABoolean> booleanSerde = + SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ABOOLEAN); @SuppressWarnings("unchecked") - private ISerializerDeserializer<ANull> nullSerde = SerializerDeserializerProvider.INSTANCE - .getSerializerDeserializer(BuiltinType.ANULL); + private ISerializerDeserializer<ANull> nullSerde = + SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL); @SuppressWarnings("unchecked") - private ISerializerDeserializer<AMissing> missingSerde = SerializerDeserializerProvider.INSTANCE - .getSerializerDeserializer(BuiltinType.AMISSING); + private ISerializerDeserializer<AMissing> missingSerde = + SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AMISSING); @Override public void evaluate(IFrameTupleReference tuple, IPointable result) throws HyracksDataException { @@ -111,7 +111,7 @@ public class AndDescriptor extends AbstractScalarFunctionDynamicDescriptor { ATypeTag.SERIALIZED_BOOLEAN_TYPE_TAG); } boolean argResult = ABooleanSerializerDeserializer.getBoolean(bytes, offset + 1); - if (! argResult) { + if (!argResult) { // anything AND FALSE = FALSE booleanSerde.serialize(ABoolean.FALSE, out); result.set(resultStorage); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/910303b4/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/EditDistanceStringIsFilterableEvaluator.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/EditDistanceStringIsFilterableEvaluator.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/EditDistanceStringIsFilterableEvaluator.java index f6c8c4f..0509f51 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/EditDistanceStringIsFilterableEvaluator.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/EditDistanceStringIsFilterableEvaluator.java @@ -22,6 +22,7 @@ package org.apache.asterix.runtime.evaluators.functions; import java.io.DataOutput; import java.io.IOException; +import org.apache.asterix.dataflow.data.nontagged.serde.ABooleanSerializerDeserializer; import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider; import org.apache.asterix.om.base.ABoolean; import org.apache.asterix.om.functions.BuiltinFunctions; @@ -35,7 +36,6 @@ import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.data.std.api.IPointable; -import org.apache.hyracks.data.std.primitive.BooleanPointable; import org.apache.hyracks.data.std.primitive.UTF8StringPointable; import org.apache.hyracks.data.std.primitive.VoidPointable; import org.apache.hyracks.data.std.util.ArrayBackedValueStorage; @@ -56,8 +56,8 @@ public class EditDistanceStringIsFilterableEvaluator implements IScalarEvaluator protected final IScalarEvaluator usePrePostEval; @SuppressWarnings("unchecked") - private final ISerializerDeserializer<ABoolean> booleanSerde = SerializerDeserializerProvider.INSTANCE - .getSerializerDeserializer(BuiltinType.ABOOLEAN); + private final ISerializerDeserializer<ABoolean> booleanSerde = + SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ABOOLEAN); private final UTF8StringPointable utf8Ptr = new UTF8StringPointable(); @@ -88,14 +88,12 @@ public class EditDistanceStringIsFilterableEvaluator implements IScalarEvaluator int strLen = utf8Ptr.getStringLength(); // Check type and extract edit-distance threshold. - long edThresh = ATypeHierarchy.getIntegerValue( - BuiltinFunctions.EDIT_DISTANCE_LIST_IS_FILTERABLE.getName(), 1, edThreshPtr.getByteArray(), - edThreshPtr.getStartOffset()); + long edThresh = ATypeHierarchy.getIntegerValue(BuiltinFunctions.EDIT_DISTANCE_LIST_IS_FILTERABLE.getName(), 1, + edThreshPtr.getByteArray(), edThreshPtr.getStartOffset()); // Check type and extract gram length. - long gramLen = ATypeHierarchy.getIntegerValue( - BuiltinFunctions.EDIT_DISTANCE_LIST_IS_FILTERABLE.getName(), 2, gramLenPtr.getByteArray(), - gramLenPtr.getStartOffset()); + long gramLen = ATypeHierarchy.getIntegerValue(BuiltinFunctions.EDIT_DISTANCE_LIST_IS_FILTERABLE.getName(), 2, + gramLenPtr.getByteArray(), gramLenPtr.getStartOffset()); // Check type and extract usePrePost flag. typeTag = usePrePostPtr.getByteArray()[usePrePostPtr.getStartOffset()]; @@ -103,7 +101,7 @@ public class EditDistanceStringIsFilterableEvaluator implements IScalarEvaluator throw new TypeMismatchException(BuiltinFunctions.EDIT_DISTANCE_STRING_IS_FILTERABLE, 3, typeTag, ATypeTag.SERIALIZED_BOOLEAN_TYPE_TAG); } - boolean usePrePost = BooleanPointable.getBoolean(usePrePostPtr.getByteArray(), + boolean usePrePost = ABooleanSerializerDeserializer.getBoolean(usePrePostPtr.getByteArray(), usePrePostPtr.getStartOffset() + 1); // Compute result. http://git-wip-us.apache.org/repos/asf/asterixdb/blob/910303b4/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/NotDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/NotDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/NotDescriptor.java index 32263ea..13037a9 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/NotDescriptor.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/NotDescriptor.java @@ -71,8 +71,8 @@ public class NotDescriptor extends AbstractScalarFunctionDynamicDescriptor { private IScalarEvaluator eval = args[0].createScalarEvaluator(ctx); @SuppressWarnings("unchecked") - private ISerializerDeserializer<ABoolean> booleanSerde = SerializerDeserializerProvider.INSTANCE - .getSerializerDeserializer(BuiltinType.ABOOLEAN); + private ISerializerDeserializer<ABoolean> booleanSerde = + SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ABOOLEAN); @Override public void evaluate(IFrameTupleReference tuple, IPointable result) throws HyracksDataException { @@ -86,8 +86,8 @@ public class NotDescriptor extends AbstractScalarFunctionDynamicDescriptor { ABoolean aResult = argRes ? ABoolean.FALSE : ABoolean.TRUE; booleanSerde.serialize(aResult, out); } else { - throw new TypeMismatchException(getIdentifier(), 0, - bytes[offset], ATypeTag.SERIALIZED_BOOLEAN_TYPE_TAG); + throw new TypeMismatchException(getIdentifier(), 0, bytes[offset], + ATypeTag.SERIALIZED_BOOLEAN_TYPE_TAG); } result.set(resultStorage); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/910303b4/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/OrDescriptor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/OrDescriptor.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/OrDescriptor.java index c7a608a..7aea25c 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/OrDescriptor.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/evaluators/functions/OrDescriptor.java @@ -74,14 +74,14 @@ public class OrDescriptor extends AbstractScalarFunctionDynamicDescriptor { private ArrayBackedValueStorage resultStorage = new ArrayBackedValueStorage(); private DataOutput output = resultStorage.getDataOutput(); @SuppressWarnings("unchecked") - private ISerializerDeserializer<ABoolean> booleanSerde = SerializerDeserializerProvider.INSTANCE - .getSerializerDeserializer(BuiltinType.ABOOLEAN); + private ISerializerDeserializer<ABoolean> booleanSerde = + SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ABOOLEAN); @SuppressWarnings("unchecked") - private ISerializerDeserializer<ANull> nullSerde = SerializerDeserializerProvider.INSTANCE - .getSerializerDeserializer(BuiltinType.ANULL); + private ISerializerDeserializer<ANull> nullSerde = + SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ANULL); @SuppressWarnings("unchecked") - private ISerializerDeserializer<AMissing> missingSerde = SerializerDeserializerProvider.INSTANCE - .getSerializerDeserializer(BuiltinType.AMISSING); + private ISerializerDeserializer<AMissing> missingSerde = + SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.AMISSING); @Override public void evaluate(IFrameTupleReference tuple, IPointable result) throws HyracksDataException { @@ -107,7 +107,7 @@ public class OrDescriptor extends AbstractScalarFunctionDynamicDescriptor { ATypeTag.SERIALIZED_BOOLEAN_TYPE_TAG); } boolean argResult = ABooleanSerializerDeserializer.getBoolean(data, offset + 1); - if (argResult == true) { + if (argResult) { // anything OR TRUE = TRUE booleanSerde.serialize(ABoolean.TRUE, output); result.set(resultStorage); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/910303b4/asterixdb/asterix-transactions/pom.xml ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/pom.xml b/asterixdb/asterix-transactions/pom.xml index 794595f..a65a436 100644 --- a/asterixdb/asterix-transactions/pom.xml +++ b/asterixdb/asterix-transactions/pom.xml @@ -24,7 +24,6 @@ <version>0.9.1-SNAPSHOT</version> </parent> <artifactId>asterix-transactions</artifactId> - <licenses> <license> <name>Apache License, Version 2.0</name> @@ -33,11 +32,9 @@ <comments>A business-friendly OSS license</comments> </license> </licenses> - <properties> <appendedResourcesDirectory>${basedir}/../src/main/appended-resources</appendedResourcesDirectory> </properties> - <build> <plugins> <plugin> @@ -83,9 +80,7 @@ </executions> </plugin> </plugins> - </build> - <dependencies> <dependency> <groupId>org.apache.hyracks</groupId> @@ -111,6 +106,11 @@ <scope>compile</scope> </dependency> <dependency> + <groupId>org.apache.asterix</groupId> + <artifactId>asterix-om</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> </dependency> @@ -149,6 +149,13 @@ <groupId>org.apache.hyracks</groupId> <artifactId>hyracks-storage-common</artifactId> </dependency> + <dependency> + <groupId>org.apache.hyracks</groupId> + <artifactId>hyracks-storage-am-bloomfilter</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hyracks</groupId> + <artifactId>algebricks-runtime</artifactId> + </dependency> </dependencies> - -</project> +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/asterixdb/blob/910303b4/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java new file mode 100644 index 0000000..d38c5b7 --- /dev/null +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntime.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.transaction.management.runtime; + +import java.nio.ByteBuffer; + +import org.apache.asterix.common.api.IAppRuntimeContext; +import org.apache.asterix.common.exceptions.ACIDException; +import org.apache.asterix.common.transactions.ILogManager; +import org.apache.asterix.common.transactions.ILogMarkerCallback; +import org.apache.asterix.common.transactions.ITransactionContext; +import org.apache.asterix.common.transactions.ITransactionManager; +import org.apache.asterix.common.transactions.JobId; +import org.apache.asterix.common.transactions.LogRecord; +import org.apache.asterix.common.transactions.LogType; +import org.apache.asterix.common.utils.TransactionUtil; +import org.apache.hyracks.algebricks.runtime.operators.base.AbstractOneInputOneOutputOneFramePushRuntime; +import org.apache.hyracks.api.comm.VSizeFrame; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.dataflow.value.RecordDescriptor; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.util.HyracksConstants; +import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAccessor; +import org.apache.hyracks.dataflow.common.data.accessors.FrameTupleReference; +import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference; +import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender; +import org.apache.hyracks.dataflow.common.utils.TaskUtil; +import org.apache.hyracks.storage.am.bloomfilter.impls.MurmurHash128Bit; + +public class CommitRuntime extends AbstractOneInputOneOutputOneFramePushRuntime { + + private final static long SEED = 0L; + + protected final ITransactionManager transactionManager; + protected final ILogManager logMgr; + protected final JobId jobId; + protected final int datasetId; + protected final int[] primaryKeyFields; + protected final boolean isTemporaryDatasetWriteJob; + protected final boolean isWriteTransaction; + protected final long[] longHashes; + protected final IHyracksTaskContext ctx; + protected final int resourcePartition; + protected ITransactionContext transactionContext; + protected LogRecord logRecord; + protected final boolean isSink; + + public CommitRuntime(IHyracksTaskContext ctx, JobId jobId, int datasetId, int[] primaryKeyFields, + boolean isTemporaryDatasetWriteJob, boolean isWriteTransaction, int resourcePartition, boolean isSink) { + this.ctx = ctx; + IAppRuntimeContext runtimeCtx = + (IAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject(); + this.transactionManager = runtimeCtx.getTransactionSubsystem().getTransactionManager(); + this.logMgr = runtimeCtx.getTransactionSubsystem().getLogManager(); + this.jobId = jobId; + this.datasetId = datasetId; + this.primaryKeyFields = primaryKeyFields; + this.tRef = new FrameTupleReference(); + this.isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob; + this.isWriteTransaction = isWriteTransaction; + this.resourcePartition = resourcePartition; + this.isSink = isSink; + longHashes = new long[2]; + } + + @Override + public void open() throws HyracksDataException { + try { + transactionContext = transactionManager.getTransactionContext(jobId, false); + transactionContext.setWriteTxn(isWriteTransaction); + ILogMarkerCallback callback = + TaskUtil.<ILogMarkerCallback>get(ILogMarkerCallback.KEY_MARKER_CALLBACK, ctx); + logRecord = new LogRecord(callback); + if (isSink) { + return; + } + initAccessAppend(ctx); + writer.open(); + } catch (ACIDException e) { + throw new HyracksDataException(e); + } + } + + @Override + public void nextFrame(ByteBuffer buffer) throws HyracksDataException { + tAccess.reset(buffer); + int nTuple = tAccess.getTupleCount(); + for (int t = 0; t < nTuple; t++) { + if (isTemporaryDatasetWriteJob) { + /** + * This "if branch" is for writes over temporary datasets. A temporary dataset does not require any lock + * and does not generate any write-ahead update and commit log but generates flush log and job commit + * log. However, a temporary dataset still MUST guarantee no-steal policy so that this notification call + * should be delivered to PrimaryIndexOptracker and used correctly in order to decrement number of + * active operation count of PrimaryIndexOptracker. By maintaining the count correctly and only allowing + * flushing when the count is 0, it can guarantee the no-steal policy for temporary datasets, too. + */ + transactionContext.notifyOptracker(false); + } else { + tRef.reset(tAccess, t); + try { + formLogRecord(buffer, t); + logMgr.log(logRecord); + if (!isSink) { + appendTupleToFrame(t); + } + } catch (ACIDException e) { + throw new HyracksDataException(e); + } + } + } + VSizeFrame message = TaskUtil.<VSizeFrame>get(HyracksConstants.KEY_MESSAGE, ctx); + if (message != null + && MessagingFrameTupleAppender.getMessageType(message) == MessagingFrameTupleAppender.MARKER_MESSAGE) { + try { + formMarkerLogRecords(message.getBuffer()); + logMgr.log(logRecord); + } catch (ACIDException e) { + throw new HyracksDataException(e); + } + message.reset(); + message.getBuffer().put(MessagingFrameTupleAppender.NULL_FEED_MESSAGE); + message.getBuffer().flip(); + } + } + + private void formMarkerLogRecords(ByteBuffer marker) { + TransactionUtil.formMarkerLogRecord(logRecord, transactionContext, datasetId, resourcePartition, marker); + } + + protected void formLogRecord(ByteBuffer buffer, int t) { + int pkHash = computePrimaryKeyHashValue(tRef, primaryKeyFields); + TransactionUtil.formEntityCommitLogRecord(logRecord, transactionContext, datasetId, pkHash, tRef, + primaryKeyFields, resourcePartition, LogType.ENTITY_COMMIT); + } + + protected int computePrimaryKeyHashValue(ITupleReference tuple, int[] primaryKeyFields) { + MurmurHash128Bit.hash3_x64_128(tuple, primaryKeyFields, SEED, longHashes); + return Math.abs((int) longHashes[0]); + } + + @Override + public void fail() throws HyracksDataException { + failed = true; + if (isSink) { + return; + } + writer.fail(); + } + + @Override + public void close() throws HyracksDataException { + if (isSink) { + return; + } + flushIfNotFailed(); + writer.close(); + appender.reset(frame, true); + } + + @Override + public void setInputRecordDescriptor(int index, RecordDescriptor recordDescriptor) { + this.inputRecordDesc = recordDescriptor; + this.tAccess = new FrameTupleAccessor(inputRecordDesc); + } + + @Override + public void flush() throws HyracksDataException { + // Commit is at the end of a modification pipeline and there is no need to flush + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/910303b4/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java new file mode 100644 index 0000000..536e657 --- /dev/null +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/CommitRuntimeFactory.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.asterix.transaction.management.runtime; + +import org.apache.asterix.common.transactions.JobId; +import org.apache.hyracks.algebricks.runtime.base.IPushRuntime; +import org.apache.hyracks.algebricks.runtime.base.IPushRuntimeFactory; +import org.apache.hyracks.api.context.IHyracksTaskContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public class CommitRuntimeFactory implements IPushRuntimeFactory { + + private static final long serialVersionUID = 1L; + + private final JobId jobId; + private final int datasetId; + private final int[] primaryKeyFields; + private final boolean isTemporaryDatasetWriteJob; + private final boolean isWriteTransaction; + private final int upsertVarIdx; + private int[] datasetPartitions; + private final boolean isSink; + + public CommitRuntimeFactory(JobId jobId, int datasetId, int[] primaryKeyFields, boolean isTemporaryDatasetWriteJob, + boolean isWriteTransaction, int upsertVarIdx, int[] datasetPartitions, boolean isSink) { + this.jobId = jobId; + this.datasetId = datasetId; + this.primaryKeyFields = primaryKeyFields; + this.isTemporaryDatasetWriteJob = isTemporaryDatasetWriteJob; + this.isWriteTransaction = isWriteTransaction; + this.upsertVarIdx = upsertVarIdx; + this.datasetPartitions = datasetPartitions; + this.isSink = isSink; + } + + @Override + public String toString() { + return "commit"; + } + + @Override + public IPushRuntime createPushRuntime(IHyracksTaskContext ctx) throws HyracksDataException { + if (upsertVarIdx >= 0) { + return new UpsertCommitRuntime(ctx, jobId, datasetId, primaryKeyFields, isTemporaryDatasetWriteJob, + isWriteTransaction, datasetPartitions[ctx.getTaskAttemptId().getTaskId().getPartition()], + upsertVarIdx, isSink); + } else { + return new CommitRuntime(ctx, jobId, datasetId, primaryKeyFields, isTemporaryDatasetWriteJob, + isWriteTransaction, datasetPartitions[ctx.getTaskAttemptId().getTaskId().getPartition()], isSink); + } + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/910303b4/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/UpsertCommitRuntime.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/UpsertCommitRuntime.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/UpsertCommitRuntime.java new file mode 100644 index 0000000..9b2fe36 --- /dev/null +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/UpsertCommitRuntime.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.transaction.management.runtime; + +import java.nio.ByteBuffer; + +import org.apache.asterix.common.transactions.JobId; +import org.apache.asterix.common.transactions.LogType; +import org.apache.asterix.common.utils.TransactionUtil; +import org.apache.asterix.dataflow.data.nontagged.serde.ABooleanSerializerDeserializer; +import org.apache.hyracks.api.context.IHyracksTaskContext; + +public class UpsertCommitRuntime extends CommitRuntime { + private final int upsertIdx; + + public UpsertCommitRuntime(IHyracksTaskContext ctx, JobId jobId, int datasetId, int[] primaryKeyFields, + boolean isTemporaryDatasetWriteJob, boolean isWriteTransaction, int resourcePartition, int upsertIdx, + boolean isSink) { + super(ctx, jobId, datasetId, primaryKeyFields, isTemporaryDatasetWriteJob, isWriteTransaction, + resourcePartition, isSink); + this.upsertIdx = upsertIdx; + } + + @Override + protected void formLogRecord(ByteBuffer buffer, int t) { + boolean isNull = ABooleanSerializerDeserializer.getBoolean(buffer.array(), tAccess.getFieldSlotsLength() + + tAccess.getTupleStartOffset(t) + tAccess.getFieldStartOffset(t, upsertIdx) + 1); + if (isNull) { + // Previous record not found (insert) + super.formLogRecord(buffer, t); + } else { + // Previous record found (delete + insert) + int pkHash = computePrimaryKeyHashValue(tRef, primaryKeyFields); + TransactionUtil.formEntityCommitLogRecord(logRecord, transactionContext, datasetId, pkHash, tRef, + primaryKeyFields, resourcePartition, LogType.UPSERT_ENTITY_COMMIT); + } + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/910303b4/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/BooleanPointable.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/BooleanPointable.java b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/BooleanPointable.java index db0b483..3df27ad 100644 --- a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/BooleanPointable.java +++ b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/primitive/BooleanPointable.java @@ -55,7 +55,7 @@ public final class BooleanPointable extends AbstractPointable implements IHashab }; public static boolean getBoolean(byte[] bytes, int start) { - return bytes[start] == 0 ? false : true; + return bytes[start] != 0; } public static void setBoolean(byte[] bytes, int start, boolean value) {
