This is an automated email from the ASF dual-hosted git repository.
peeyush pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push:
new a13a66c375 [NO ISSUE][TX] Partition level atomicity implementation
a13a66c375 is described below
commit a13a66c37534fd1f0332fd1099c0041a7002580d
Author: Peeyush Gupta <[email protected]>
AuthorDate: Fri May 26 12:40:29 2023 -0700
[NO ISSUE][TX] Partition level atomicity implementation
- user model changes: yes
- storage format changes: no
- interface changes: yes
Details:
With this change, inserts/upserts/deletes on datasets
created without any type will have partition level
atomicity. To acheieve this we have made the following
changes:
- Only one insert/upsert/delete at a time on the dataset.
- Queries do not read from memory components and directly
go to valid disk components.
- No locks taken by either writes or reads
- No logs written.
- The disk components generated by the operation are marked
as valid and checkpoint is updated on operation completion
and not on flush.
Change-Id: I8ed0fac37e026c909c986e6d844c3fae3833911e
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/17559
Integration-Tests: Jenkins <[email protected]>
Tested-by: Jenkins <[email protected]>
Reviewed-by: Peeyush Gupta <[email protected]>
Reviewed-by: Murtadha Hubail <[email protected]>
---
.../asterix/app/io/PersistedResourceRegistry.java | 3 +
.../dataflow/TestPrimaryIndexOperationTracker.java | 5 +-
.../TestPrimaryIndexOperationTrackerFactory.java | 3 +-
.../common/context/DatasetLifecycleManager.java | 13 +++-
.../context/PrimaryIndexOperationTracker.java | 77 +++++++++++++++++--
.../LSMInsertDeleteOperatorNodePushable.java | 32 ++++++++
.../AtomicLSMIOOperationCallback.java | 54 ++++++++++++++
.../AtomicLSMIndexIOOperationCallbackFactory.java | 62 ++++++++++++++++
.../ioopcallbacks/LSMIOOperationCallback.java | 8 +-
.../metadata/bootstrap/MetadataBootstrap.java | 21 +++---
.../ArrayBTreeResourceFactoryProvider.java | 3 +-
.../declared/BTreeResourceFactoryProvider.java | 5 +-
.../apache/asterix/metadata/entities/Dataset.java | 26 ++++++-
.../metadata/entities/InternalDatasetDetails.java | 4 +
.../DatasetTupleTranslator.java | 3 +-
.../asterix/metadata/utils/MetadataLockUtil.java | 2 +-
.../LSMPrimaryInsertOperatorNodePushable.java | 44 +++++++++--
.../LSMPrimaryUpsertOperatorNodePushable.java | 58 ++++++++++++---
.../LSMSecondaryUpsertOperatorNodePushable.java | 22 ++++--
...ryUpsertWithNestedPlanOperatorNodePushable.java | 14 +++-
.../management/runtime/NoOpCommitRuntime.java | 80 ++++++++++++++++++++
.../runtime/NoOpCommitRuntimeFactory.java | 46 ++++++++++++
.../apache/hyracks/api/util/HyracksConstants.java | 2 +
.../am/lsm/btree/LSMBTreeOperatorTestHelper.java | 2 +-
...ndexInsertUpdateDeleteOperatorNodePushable.java | 3 +
.../dataflow/LSMColumnBTreeLocalResource.java | 13 ++--
.../LSMColumnBTreeLocalResourceFactory.java | 6 +-
.../lsm/btree/column/impls/lsm/LSMColumnBTree.java | 6 +-
.../lsm/btree/column/utils/LSMColumnBTreeUtil.java | 4 +-
.../lsm/btree/dataflow/LSMBTreeLocalResource.java | 15 ++--
.../dataflow/LSMBTreeLocalResourceFactory.java | 8 +-
.../storage/am/lsm/btree/impls/LSMBTree.java | 4 +-
.../storage/am/lsm/btree/utils/LSMBTreeUtil.java | 19 ++++-
.../storage/am/lsm/common/api/ILSMIndex.java | 23 ++++++
...ndexInsertUpdateDeleteOperatorNodePushable.java | 14 ++++
.../am/lsm/common/impls/AbstractLSMIndex.java | 86 +++++++++++++++++++++-
.../storage/am/lsm/common/impls/LSMHarness.java | 10 ++-
.../storage/am/lsm/btree/impl/TestLsmBtree.java | 2 +-
.../lsm/btree/impl/TestLsmBtreeLocalResource.java | 4 +-
.../impl/TestLsmBtreeLocalResourceFactory.java | 2 +-
40 files changed, 715 insertions(+), 93 deletions(-)
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/io/PersistedResourceRegistry.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/io/PersistedResourceRegistry.java
index 077b657e33..e754c77175 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/io/PersistedResourceRegistry.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/io/PersistedResourceRegistry.java
@@ -31,6 +31,7 @@ import
org.apache.asterix.common.context.CorrelatedPrefixMergePolicyFactory;
import org.apache.asterix.common.context.DatasetInfoProvider;
import org.apache.asterix.common.context.DatasetLSMComponentIdGeneratorFactory;
import org.apache.asterix.common.dataflow.DatasetLocalResource;
+import
org.apache.asterix.common.ioopcallbacks.AtomicLSMIndexIOOperationCallbackFactory;
import
org.apache.asterix.common.ioopcallbacks.LSMIndexIOOperationCallbackFactory;
import
org.apache.asterix.common.ioopcallbacks.LSMIndexPageWriteCallbackFactory;
import org.apache.asterix.common.library.LibraryDescriptor;
@@ -190,6 +191,8 @@ public class PersistedResourceRegistry implements
IPersistedResourceRegistry {
// ILSMOperationTrackerFactory
registeredClasses.put("NoOpIOOperationCallbackFactory",
NoOpIOOperationCallbackFactory.class);
registeredClasses.put("LSMBTreeIOOperationCallbackFactory",
LSMIndexIOOperationCallbackFactory.class);
+ registeredClasses.put("AtomicLSMBTreeIOOperationCallbackFactory",
+ AtomicLSMIndexIOOperationCallbackFactory.class);
registeredClasses.put("LSMIndexPageWriteCallbackFactory",
LSMIndexPageWriteCallbackFactory.class);
registeredClasses.put("NoOpPageWriteCallbackFactory",
NoOpPageWriteCallbackFactory.class);
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTracker.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTracker.java
index 250c25f7dc..8f482ee4a9 100644
---
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTracker.java
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTracker.java
@@ -23,6 +23,7 @@ import java.util.List;
import org.apache.asterix.common.context.DatasetInfo;
import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
import org.apache.asterix.common.transactions.ILogManager;
import org.apache.asterix.common.transactions.LogRecord;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -34,8 +35,8 @@ public class TestPrimaryIndexOperationTracker extends
PrimaryIndexOperationTrack
private final List<ITestOpCallback<Void>> callbacks = new ArrayList<>();
public TestPrimaryIndexOperationTracker(int datasetID, int partition,
ILogManager logManager, DatasetInfo dsInfo,
- ILSMComponentIdGenerator idGenerator) {
- super(datasetID, partition, logManager, dsInfo, idGenerator);
+ ILSMComponentIdGenerator idGenerator,
IIndexCheckpointManagerProvider indexCheckpointManagerProvider) {
+ super(datasetID, partition, logManager, dsInfo, idGenerator,
indexCheckpointManagerProvider);
}
public void addCallback(ITestOpCallback<Void> callback) {
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java
index 38fdf5689d..8cf708bc6f 100644
---
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java
@@ -60,7 +60,8 @@ public class TestPrimaryIndexOperationTrackerFactory extends
PrimaryIndexOperati
Field opTrackersField =
DatasetResource.class.getDeclaredField("datasetPrimaryOpTrackers");
opTracker = new TestPrimaryIndexOperationTracker(datasetId,
partition,
appCtx.getTransactionSubsystem().getLogManager(),
dsr.getDatasetInfo(),
- dslcManager.getComponentIdGenerator(datasetId,
partition, resource.getPath()));
+ dslcManager.getComponentIdGenerator(datasetId,
partition, resource.getPath()),
+ appCtx.getIndexCheckpointManagerProvider());
replaceMapEntry(opTrackersField, dsr, partition, opTracker);
}
return opTracker;
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index 117b4fc4d0..f9c410bb44 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -339,7 +339,7 @@ public class DatasetLifecycleManager implements
IDatasetLifecycleManager, ILifeC
ILSMComponentIdGenerator idGenerator =
new
LSMComponentIdGenerator(storageProperties.getMemoryComponentsNum(),
lastValidId);
PrimaryIndexOperationTracker opTracker = new
PrimaryIndexOperationTracker(dataset.getDatasetID(), partition,
- logManager, dataset.getDatasetInfo(), idGenerator);
+ logManager, dataset.getDatasetInfo(), idGenerator,
indexCheckpointManagerProvider);
dataset.setPrimaryIndexOperationTracker(partition, opTracker);
dataset.setIdGenerator(partition, idGenerator);
}
@@ -424,7 +424,12 @@ public class DatasetLifecycleManager implements
IDatasetLifecycleManager, ILifeC
return;
}
// ensure all in-flight flushes gets scheduled
- logManager.log(waitLog);
+ final boolean requiresWaitLog =
+ dsInfo.getIndexes().values().stream().noneMatch(indexInfo ->
indexInfo.getIndex().isAtomic());
+ if (requiresWaitLog) {
+ logManager.log(waitLog);
+ }
+
for (PrimaryIndexOperationTracker primaryOpTracker :
dsr.getOpTrackers()) {
if (!partitions.test(primaryOpTracker.getPartition())) {
continue;
@@ -439,7 +444,9 @@ public class DatasetLifecycleManager implements
IDatasetLifecycleManager, ILifeC
primaryOpTracker.flushIfNeeded();
}
// ensure requested flushes were scheduled
- logManager.log(waitLog);
+ if (requiresWaitLog) {
+ logManager.log(waitLog);
+ }
if (!asyncFlush) {
List<FlushOperation> flushes = new ArrayList<>();
for (PrimaryIndexOperationTracker primaryOpTracker :
dsr.getOpTrackers()) {
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
index 2704e6429b..d9456d2b03 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -28,14 +28,18 @@ import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.asterix.common.dataflow.LSMIndexUtil;
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.ioopcallbacks.LSMIOOperationCallback;
import org.apache.asterix.common.metadata.MetadataIndexImmutableProperties;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
+import org.apache.asterix.common.storage.ResourceReference;
import org.apache.asterix.common.transactions.AbstractOperationCallback;
import org.apache.asterix.common.transactions.ILogManager;
import org.apache.asterix.common.transactions.LogRecord;
import org.apache.asterix.common.utils.TransactionUtil;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
@@ -50,6 +54,7 @@ import
org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker;
import
org.apache.hyracks.storage.am.lsm.common.api.IoOperationCompleteListener;
import org.apache.hyracks.storage.am.lsm.common.api.LSMOperationType;
import org.apache.hyracks.storage.am.lsm.common.impls.FlushOperation;
+import
org.apache.hyracks.storage.am.lsm.common.impls.IndexComponentFileReference;
import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
import org.apache.hyracks.storage.common.IModificationOperationCallback;
import org.apache.hyracks.storage.common.ISearchOperationCallback;
@@ -70,14 +75,17 @@ public class PrimaryIndexOperationTracker extends
BaseOperationTracker implement
private boolean flushLogCreated = false;
private final Map<String, FlushOperation> scheduledFlushes = new
HashMap<>();
private long lastFlushTime = System.nanoTime();
+ private final Map<String, FlushOperation> lastFlushOperation = new
HashMap<>();
+ private final IIndexCheckpointManagerProvider
indexCheckpointManagerProvider;
public PrimaryIndexOperationTracker(int datasetID, int partition,
ILogManager logManager, DatasetInfo dsInfo,
- ILSMComponentIdGenerator idGenerator) {
+ ILSMComponentIdGenerator idGenerator,
IIndexCheckpointManagerProvider indexCheckpointManagerProvider) {
super(datasetID, dsInfo);
this.partition = partition;
this.logManager = logManager;
this.numActiveOperations = new AtomicInteger();
this.idGenerator = idGenerator;
+ this.indexCheckpointManagerProvider = indexCheckpointManagerProvider;
}
@Override
@@ -101,7 +109,7 @@ public class PrimaryIndexOperationTracker extends
BaseOperationTracker implement
}
public synchronized void flushIfNeeded() throws HyracksDataException {
- if (canSafelyFlush()) {
+ if (canSafelyFlush() && !isFlushLogCreated()) {
flushIfRequested();
}
}
@@ -163,9 +171,16 @@ public class PrimaryIndexOperationTracker extends
BaseOperationTracker implement
synchronized (opTracker) {
ILSMMemoryComponent memComponent =
lsmIndex.getCurrentMemoryComponent();
if (memComponent.getWriterCount() > 0) {
- throw new IllegalStateException(
- "Can't request a flush on a component with
writers inside: Index:" + lsmIndex
- + " Component:" + memComponent);
+ if (lsmIndex.isAtomic()) {
+ LOGGER.debug(
+ "Can't request a flush on a component with
writers inside: Index: {} Component: {}",
+ lsmIndex, memComponent);
+ return;
+ } else {
+ throw new IllegalStateException(
+ "Can't request a flush on a component with
writers inside: Index:" + lsmIndex
+ + " Component:" + memComponent);
+ }
}
if (memComponent.getState() ==
ComponentState.READABLE_WRITABLE && memComponent.isModified()) {
memComponent.setUnwritable();
@@ -180,7 +195,7 @@ public class PrimaryIndexOperationTracker extends
BaseOperationTracker implement
+ " and partition " + partition + " and is modified
but its component id is null");
}
LogRecord logRecord = new LogRecord();
- if (dsInfo.isDurable()) {
+ if (dsInfo.isDurable() && !primaryLsmIndex.isAtomic()) {
/*
* Generate a FLUSH log.
* Flush will be triggered when the log is written to disk by
LogFlusher.
@@ -194,7 +209,9 @@ public class PrimaryIndexOperationTracker extends
BaseOperationTracker implement
}
flushLogCreated = true;
} else {
- //trigger flush for temporary indexes without generating a
FLUSH log.
+ // trigger flush for temporary indexes and indexes on datasets
with atomic statements enabled without
+ // generating a FLUSH log.
+ flushLogCreated = true;
triggerScheduleFlush(logRecord);
}
}
@@ -221,6 +238,16 @@ public class PrimaryIndexOperationTracker extends
BaseOperationTracker implement
Map<String, Object> flushMap = new HashMap<>();
flushMap.put(LSMIOOperationCallback.KEY_FLUSH_LOG_LSN, flushLsn);
flushMap.put(LSMIOOperationCallback.KEY_NEXT_COMPONENT_ID,
nextComponentId);
+ for (ILSMIndex lsmIndex :
dsInfo.getDatasetPartitionOpenIndexes(partition)) {
+ if (lsmIndex.isPrimaryIndex()) {
+ if (lsmIndex.isCurrentMutableComponentEmpty()) {
+ LOGGER.debug("Primary index on dataset {} and
partition {} is empty... skipping flush",
+ dsInfo.getDatasetID(), partition);
+ return;
+ }
+ break;
+ }
+ }
synchronized (scheduledFlushes) {
for (ILSMIndex lsmIndex :
dsInfo.getDatasetPartitionOpenIndexes(partition)) {
ILSMIndexAccessor accessor =
lsmIndex.createAccessor(NoOpIndexAccessParameters.INSTANCE);
@@ -228,6 +255,9 @@ public class PrimaryIndexOperationTracker extends
BaseOperationTracker implement
ILSMIOOperation flush = accessor.scheduleFlush();
lastFlushTime = System.nanoTime();
scheduledFlushes.put(flush.getTarget().getRelativePath(),
(FlushOperation) flush);
+ if (lsmIndex.isAtomic()) {
+ lastFlushOperation.put(lsmIndex.getIndexIdentifier(),
(FlushOperation) flush);
+ }
flush.addCompleteListener(this);
}
}
@@ -236,6 +266,39 @@ public class PrimaryIndexOperationTracker extends
BaseOperationTracker implement
}
}
+ public void commit() throws HyracksDataException {
+ LogRecord logRecord = new LogRecord();
+ triggerScheduleFlush(logRecord);
+ List<FlushOperation> flushes = new ArrayList<>(getScheduledFlushes());
+ LSMIndexUtil.waitFor(flushes);
+
+ Set<ILSMIndex> indexes =
dsInfo.getDatasetPartitionOpenIndexes(partition);
+ for (ILSMIndex lsmIndex : indexes) {
+ lsmIndex.commit();
+ }
+ for (FlushOperation flush : lastFlushOperation.values()) {
+ FileReference target = flush.getTarget();
+ Map<String, Object> map = flush.getParameters();
+ final LSMComponentId id = (LSMComponentId)
map.get(LSMIOOperationCallback.KEY_NEXT_COMPONENT_ID);
+ final ResourceReference ref =
ResourceReference.of(target.getAbsolutePath());
+ final long componentSequence =
IndexComponentFileReference.of(ref.getName()).getSequenceEnd();
+ indexCheckpointManagerProvider.get(ref).flushed(componentSequence,
0L, id.getMaxId());
+ }
+ lastFlushOperation.clear();
+
+ for (ILSMIndex lsmIndex : indexes) {
+ lsmIndex.getMergePolicy().diskComponentAdded(lsmIndex, false);
+ }
+ }
+
+ public void abort() throws HyracksDataException {
+ Set<ILSMIndex> indexes =
dsInfo.getDatasetPartitionOpenIndexes(partition);
+ for (ILSMIndex lsmIndex : indexes) {
+ lsmIndex.abort();
+ }
+ lastFlushOperation.clear();
+ }
+
@Override
public void completed(ILSMIOOperation operation) {
synchronized (scheduledFlushes) {
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java
index f9934f22d2..421fb7b104 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMInsertDeleteOperatorNodePushable.java
@@ -21,6 +21,7 @@ package org.apache.asterix.common.dataflow;
import java.nio.ByteBuffer;
import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
import org.apache.asterix.common.transactions.ILogMarkerCallback;
import org.apache.asterix.common.transactions.PrimaryIndexLogMarkerCallback;
import org.apache.hyracks.api.comm.VSizeFrame;
@@ -48,6 +49,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import
org.apache.hyracks.storage.am.lsm.common.dataflow.LSMIndexInsertUpdateDeleteOperatorNodePushable;
import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
+import org.apache.hyracks.storage.common.IIndex;
import org.apache.hyracks.storage.common.IIndexAccessParameters;
import org.apache.hyracks.storage.common.LocalResource;
@@ -212,6 +214,11 @@ public class LSMInsertDeleteOperatorNodePushable extends
LSMIndexInsertUpdateDel
failure = ExceptionUtils.suppress(failure, th);
}
}
+ if (failure == null && !failed) {
+ commitAtomicInsertDelete();
+ } else {
+ abortAtomicInsertDelete();
+ }
if (failure != null) {
throw HyracksDataException.create(failure);
}
@@ -219,6 +226,7 @@ public class LSMInsertDeleteOperatorNodePushable extends
LSMIndexInsertUpdateDel
@Override
public void fail() throws HyracksDataException {
+ this.failed = true;
if (writerOpen) {
writer.fail();
}
@@ -227,4 +235,28 @@ public class LSMInsertDeleteOperatorNodePushable extends
LSMIndexInsertUpdateDel
public boolean isPrimary() {
return isPrimary;
}
+
+ private void commitAtomicInsertDelete() throws HyracksDataException {
+ if (isPrimary) {
+ for (IIndex index : indexes) {
+ if (((ILSMIndex) index).isAtomic()) {
+ PrimaryIndexOperationTracker opTracker =
+ ((PrimaryIndexOperationTracker) ((ILSMIndex)
index).getOperationTracker());
+ opTracker.commit();
+ }
+ }
+ }
+ }
+
+ private void abortAtomicInsertDelete() throws HyracksDataException {
+ if (isPrimary) {
+ for (IIndex index : indexes) {
+ if (((ILSMIndex) index).isAtomic()) {
+ PrimaryIndexOperationTracker opTracker =
+ ((PrimaryIndexOperationTracker) ((ILSMIndex)
index).getOperationTracker());
+ opTracker.abort();
+ }
+ }
+ }
+ }
}
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AtomicLSMIOOperationCallback.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AtomicLSMIOOperationCallback.java
new file mode 100644
index 0000000000..111c6e7205
--- /dev/null
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AtomicLSMIOOperationCallback.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.common.ioopcallbacks;
+
+import org.apache.asterix.common.context.DatasetInfo;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.IoUtil;
+import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation;
+import
org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationStatus;
+import
org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperationType;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+
+public class AtomicLSMIOOperationCallback extends LSMIOOperationCallback {
+
+ public AtomicLSMIOOperationCallback(DatasetInfo dsInfo, ILSMIndex
lsmIndex, ILSMComponentId componentId,
+ IIndexCheckpointManagerProvider indexCheckpointManagerProvider) {
+ super(dsInfo, lsmIndex, componentId, indexCheckpointManagerProvider);
+ }
+
+ @Override
+ public void afterFinalize(ILSMIOOperation operation) throws
HyracksDataException {
+ if (operation.getStatus() == LSMIOOperationStatus.FAILURE) {
+ return;
+ }
+ if (operation.getIOOpertionType() != LSMIOOperationType.LOAD
+ && operation.getAccessor().getOpContext().getOperation() ==
IndexOperation.DELETE_COMPONENTS) {
+ deleteComponentsFromCheckpoint(operation);
+ } else if (operation.getIOOpertionType() == LSMIOOperationType.LOAD) {
+ addComponentToCheckpoint(operation);
+ } else if (isMerge(operation)) {
+ IoUtil.delete(getOperationMaskFilePath(operation));
+ }
+ }
+}
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AtomicLSMIndexIOOperationCallbackFactory.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AtomicLSMIndexIOOperationCallbackFactory.java
new file mode 100644
index 0000000000..3829c547cd
--- /dev/null
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AtomicLSMIndexIOOperationCallbackFactory.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.common.ioopcallbacks;
+
+import org.apache.asterix.common.api.IDatasetInfoProvider;
+import org.apache.asterix.common.api.ILSMComponentIdGeneratorFactory;
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.IJsonSerializable;
+import org.apache.hyracks.api.io.IPersistedResourceRegistry;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+
+import com.fasterxml.jackson.databind.JsonNode;
+
+public class AtomicLSMIndexIOOperationCallbackFactory extends
LSMIndexIOOperationCallbackFactory {
+
+ private static final long serialVersionUID = -2617830712731546932L;
+
+ public
AtomicLSMIndexIOOperationCallbackFactory(ILSMComponentIdGeneratorFactory
idGeneratorFactory,
+ IDatasetInfoProvider datasetInfoProvider) {
+ super(idGeneratorFactory, datasetInfoProvider);
+ }
+
+ protected IIndexCheckpointManagerProvider
getIndexCheckpointManagerProvider() {
+ return ((INcApplicationContext)
ncCtx.getApplicationContext()).getIndexCheckpointManagerProvider();
+ }
+
+ @Override
+ public ILSMIOOperationCallback createIoOpCallback(ILSMIndex index) throws
HyracksDataException {
+ return new
AtomicLSMIOOperationCallback(datasetInfoProvider.getDatasetInfo(ncCtx), index,
+ getComponentIdGenerator().getId(),
getIndexCheckpointManagerProvider());
+ }
+
+ @SuppressWarnings("squid:S1172") // unused parameter
+ public static IJsonSerializable fromJson(IPersistedResourceRegistry
registry, JsonNode json)
+ throws HyracksDataException {
+ final ILSMComponentIdGeneratorFactory idGeneratorFactory =
+ (ILSMComponentIdGeneratorFactory)
registry.deserialize(json.get("idGeneratorFactory"));
+ final IDatasetInfoProvider datasetInfoProvider =
+ (IDatasetInfoProvider)
registry.deserialize(json.get("datasetInfoProvider"));
+ return new
AtomicLSMIndexIOOperationCallbackFactory(idGeneratorFactory,
datasetInfoProvider);
+ }
+}
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
index 999636da13..9ab4848c5c 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMIOOperationCallback.java
@@ -138,7 +138,7 @@ public class LSMIOOperationCallback implements
ILSMIOOperationCallback {
}
}
- private void addComponentToCheckpoint(ILSMIOOperation operation) throws
HyracksDataException {
+ protected void addComponentToCheckpoint(ILSMIOOperation operation) throws
HyracksDataException {
// will always update the checkpoint file even if no new component was
created
FileReference target = operation.getTarget();
Map<String, Object> map = operation.getParameters();
@@ -150,7 +150,7 @@ public class LSMIOOperationCallback implements
ILSMIOOperationCallback {
indexCheckpointManagerProvider.get(ref).flushed(componentSequence,
lsn, id.getMaxId());
}
- private void deleteComponentsFromCheckpoint(ILSMIOOperation operation)
throws HyracksDataException {
+ protected void deleteComponentsFromCheckpoint(ILSMIOOperation operation)
throws HyracksDataException {
// component was deleted... if a flush, do nothing.. if a merge, must
update the checkpoint file
if (operation.getIOOpertionType() == LSMIOOperationType.MERGE) {
// Get component id of the last disk component
@@ -298,12 +298,12 @@ public class LSMIOOperationCallback implements
ILSMIOOperationCallback {
return
indexCheckpointManagerProvider.get(resourceReference).getValidComponentSequence();
}
- private boolean isMerge(ILSMIOOperation operation) {
+ protected boolean isMerge(ILSMIOOperation operation) {
return operation.getIOOpertionType() == LSMIOOperationType.MERGE
&& operation.getAccessor().getOpContext().getOperation() !=
IndexOperation.DELETE_COMPONENTS;
}
- private static FileReference getOperationMaskFilePath(ILSMIOOperation
operation) {
+ protected static FileReference getOperationMaskFilePath(ILSMIOOperation
operation) {
FileReference target = operation.getTarget();
String componentSequence =
getComponentSequence(target.getFile().getAbsolutePath());
FileReference idxRelPath = target.getParent();
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
index 8880461685..d7637f4bd2 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/bootstrap/MetadataBootstrap.java
@@ -423,16 +423,17 @@ public class MetadataBootstrap {
if (createMetadataDataset) {
final double bloomFilterFalsePositiveRate =
appContext.getStorageProperties().getBloomFilterFalsePositiveRate();
- LSMBTreeLocalResourceFactory lsmBtreeFactory = new
LSMBTreeLocalResourceFactory(
- storageComponentProvider.getStorageManager(), typeTraits,
cmpFactories, null, null, null,
- opTrackerFactory, ioOpCallbackFactory,
pageWriteCallbackFactory,
- storageComponentProvider.getMetadataPageManagerFactory(),
- new AsterixVirtualBufferCacheProvider(datasetId),
- storageComponentProvider.getIoOperationSchedulerProvider(),
- appContext.getMetadataMergePolicyFactory(),
StorageConstants.DEFAULT_COMPACTION_POLICY_PROPERTIES,
- true, bloomFilterKeyFields, bloomFilterFalsePositiveRate,
true, null,
- NoOpCompressorDecompressorFactory.INSTANCE, true,
-
TypeTraitProvider.INSTANCE.getTypeTrait(BuiltinType.ANULL),
NullIntrospector.INSTANCE, false);
+ LSMBTreeLocalResourceFactory lsmBtreeFactory =
+ new
LSMBTreeLocalResourceFactory(storageComponentProvider.getStorageManager(),
typeTraits,
+ cmpFactories, null, null, null, opTrackerFactory,
ioOpCallbackFactory,
+ pageWriteCallbackFactory,
storageComponentProvider.getMetadataPageManagerFactory(),
+ new AsterixVirtualBufferCacheProvider(datasetId),
+
storageComponentProvider.getIoOperationSchedulerProvider(),
+ appContext.getMetadataMergePolicyFactory(),
+
StorageConstants.DEFAULT_COMPACTION_POLICY_PROPERTIES, true,
bloomFilterKeyFields,
+ bloomFilterFalsePositiveRate, true, null,
NoOpCompressorDecompressorFactory.INSTANCE, true,
+
TypeTraitProvider.INSTANCE.getTypeTrait(BuiltinType.ANULL),
NullIntrospector.INSTANCE,
+ false, false);
DatasetLocalResourceFactory dsLocalResourceFactory =
new DatasetLocalResourceFactory(datasetId,
lsmBtreeFactory);
// TODO(amoudi) Creating the index should be done through the same
code path as
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/ArrayBTreeResourceFactoryProvider.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/ArrayBTreeResourceFactoryProvider.java
index 9a2821eece..a61db6f480 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/ArrayBTreeResourceFactoryProvider.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/ArrayBTreeResourceFactoryProvider.java
@@ -100,7 +100,8 @@ public class ArrayBTreeResourceFactoryProvider implements
IResourceFactoryProvid
pageWriteCallbackFactory, metadataPageManagerFactory,
vbcProvider, ioSchedulerProvider,
mergePolicyFactory, mergePolicyProperties, true, null,
bloomFilterFalsePositiveRate,
index.isPrimaryIndex(), btreeFields,
compDecompFactory, false,
- typeTraitProvider.getTypeTrait(BuiltinType.ANULL),
NullIntrospector.INSTANCE, false);
+ typeTraitProvider.getTypeTrait(BuiltinType.ANULL),
NullIntrospector.INSTANCE, false,
+ dataset.isAtomic());
default:
throw new
CompilationException(ErrorCode.COMPILATION_UNKNOWN_DATASET_TYPE,
dataset.getDatasetType().toString());
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/BTreeResourceFactoryProvider.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/BTreeResourceFactoryProvider.java
index 88e96e73ab..ab4b58552c 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/BTreeResourceFactoryProvider.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/BTreeResourceFactoryProvider.java
@@ -113,7 +113,7 @@ public class BTreeResourceFactoryProvider implements
IResourceFactoryProvider {
mergePolicyFactory, mergePolicyProperties, true,
bloomFilterFields,
bloomFilterFalsePositiveRate,
index.isPrimaryIndex(), btreeFields, compDecompFactory,
hasBloomFilter,
typeTraitProvider.getTypeTrait(BuiltinType.ANULL),
- NullIntrospector.INSTANCE,
isSecondaryNoIncrementalMaintenance);
+ NullIntrospector.INSTANCE,
isSecondaryNoIncrementalMaintenance, dataset.isAtomic());
} else {
//Column
List<Integer> keySourceIndicator =
@@ -127,7 +127,8 @@ public class BTreeResourceFactoryProvider implements
IResourceFactoryProvider {
pageWriteCallbackFactory,
metadataPageManagerFactory, vbcProvider, ioSchedulerProvider,
mergePolicyFactory, mergePolicyProperties,
bloomFilterFields, bloomFilterFalsePositiveRate,
btreeFields, compDecompFactory,
typeTraitProvider.getTypeTrait(BuiltinType.ANULL),
- NullIntrospector.INSTANCE,
isSecondaryNoIncrementalMaintenance, columnManagerFactory);
+ NullIntrospector.INSTANCE,
isSecondaryNoIncrementalMaintenance, columnManagerFactory,
+ dataset.isAtomic());
}
default:
throw new
CompilationException(ErrorCode.COMPILATION_UNKNOWN_DATASET_TYPE,
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
index b97880e350..e8baeb141b 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/Dataset.java
@@ -37,6 +37,7 @@ import
org.apache.asterix.common.context.IStorageComponentProvider;
import org.apache.asterix.common.dataflow.NoOpFrameOperationCallbackFactory;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
+import
org.apache.asterix.common.ioopcallbacks.AtomicLSMIndexIOOperationCallbackFactory;
import
org.apache.asterix.common.ioopcallbacks.LSMIndexIOOperationCallbackFactory;
import
org.apache.asterix.common.ioopcallbacks.LSMIndexPageWriteCallbackFactory;
import org.apache.asterix.common.metadata.DatasetFullyQualifiedName;
@@ -81,6 +82,7 @@ import
org.apache.asterix.transaction.management.opcallbacks.SecondaryIndexSearc
import
org.apache.asterix.transaction.management.opcallbacks.UpsertOperationCallbackFactory;
import
org.apache.asterix.transaction.management.resource.DatasetLocalResourceFactory;
import org.apache.asterix.transaction.management.runtime.CommitRuntimeFactory;
+import
org.apache.asterix.transaction.management.runtime.NoOpCommitRuntimeFactory;
import
org.apache.asterix.transaction.management.service.transaction.DatasetIdFactory;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.commons.lang3.mutable.MutableObject;
@@ -473,7 +475,12 @@ public class Dataset implements IMetadataEntity<Dataset>,
IDataset {
*/
@SuppressWarnings("squid:S1172")
public ILSMIOOperationCallbackFactory getIoOperationCallbackFactory(Index
index) throws AlgebricksException {
- return new
LSMIndexIOOperationCallbackFactory(getComponentIdGeneratorFactory(),
getDatasetInfoProvider());
+ if (isAtomic()) {
+ return new
AtomicLSMIndexIOOperationCallbackFactory(getComponentIdGeneratorFactory(),
+ getDatasetInfoProvider());
+ } else {
+ return new
LSMIndexIOOperationCallbackFactory(getComponentIdGeneratorFactory(),
getDatasetInfoProvider());
+ }
}
public ILSMPageWriteCallbackFactory getPageWriteCallbackFactory() throws
AlgebricksException {
@@ -499,6 +506,11 @@ public class Dataset implements IMetadataEntity<Dataset>,
IDataset {
return new DatasetInfoProvider(getDatasetId());
}
+ public boolean isAtomic() {
+ return datasetType == DatasetType.INTERNAL
+ && ((InternalDatasetDetails)
getDatasetDetails()).isDatasetWithoutTypeSpecification();
+ }
+
/**
* Get search callback factory for this dataset with the passed index and
operation
*
@@ -514,6 +526,10 @@ public class Dataset implements IMetadataEntity<Dataset>,
IDataset {
public ISearchOperationCallbackFactory
getSearchCallbackFactory(IStorageComponentProvider storageComponentProvider,
Index index, IndexOperation op, int[] primaryKeyFields, int[]
primaryKeyFieldsInSecondaryIndex,
boolean proceedIndexOnlyPlan) throws AlgebricksException {
+ if (isAtomic()) {
+ return NoOpOperationCallbackFactory.INSTANCE;
+ }
+
if (index.isPrimaryIndex()) {
/**
* Due to the read-committed isolation level,
@@ -566,6 +582,10 @@ public class Dataset implements IMetadataEntity<Dataset>,
IDataset {
public IModificationOperationCallbackFactory
getModificationCallbackFactory(
IStorageComponentProvider componentProvider, Index index,
IndexOperation op, int[] primaryKeyFields)
throws AlgebricksException {
+ if (isAtomic()) {
+ return NoOpOperationCallbackFactory.INSTANCE;
+ }
+
if (index.isPrimaryIndex()) {
return op == IndexOperation.UPSERT || op == IndexOperation.INSERT
? new UpsertOperationCallbackFactory(getDatasetId(),
@@ -636,6 +656,10 @@ public class Dataset implements IMetadataEntity<Dataset>,
IDataset {
IBinaryHashFunctionFactory[] pkHashFunFactories =
getPrimaryHashFunctionFactories(metadataProvider);
ITuplePartitionerFactory partitionerFactory = new
FieldHashPartitionerFactory(primaryKeyFieldPermutation,
pkHashFunFactories, datasetPartitions.length);
+ if (isAtomic()) {
+ return new NoOpCommitRuntimeFactory(datasetId,
primaryKeyFieldPermutation,
+ metadataProvider.isWriteTransaction(), datasetPartitions,
isSink, partitionerFactory);
+ }
return new CommitRuntimeFactory(datasetId, primaryKeyFieldPermutation,
metadataProvider.isWriteTransaction(),
datasetPartitions, isSink, partitionerFactory);
}
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/InternalDatasetDetails.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/InternalDatasetDetails.java
index fd19937a2c..5abad5d06a 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/InternalDatasetDetails.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/InternalDatasetDetails.java
@@ -145,6 +145,10 @@ public class InternalDatasetDetails implements
IDatasetDetails {
return filterSourceIndicator;
}
+ public boolean isDatasetWithoutTypeSpecification() {
+ return isDatasetWithoutTypeSpecification;
+ }
+
@Override
public DatasetType getDatasetType() {
return DatasetType.INTERNAL;
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
index ac798aa735..39500539ae 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entitytupletranslators/DatasetTupleTranslator.java
@@ -219,9 +219,10 @@ public class DatasetTupleTranslator extends
AbstractTupleTranslator<Dataset> {
}
}
+ boolean isDatasetWithoutTypeSpec = primaryKeyTypes != null &&
!primaryKeyTypes.isEmpty();
datasetDetails = new InternalDatasetDetails(fileStructure,
partitioningStrategy, partitioningKey,
partitioningKey, keyFieldSourceIndicator,
primaryKeyTypes, autogenerated, filterSourceIndicator,
- filterField);
+ filterField, isDatasetWithoutTypeSpec);
break;
}
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockUtil.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockUtil.java
index 1de387fbab..8ee8f11b6a 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockUtil.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockUtil.java
@@ -248,7 +248,7 @@ public class MetadataLockUtil implements IMetadataLockUtil {
public void insertDeleteUpsertBegin(IMetadataLockManager lockMgr, LockList
locks, DataverseName dataverseName,
String datasetName) throws AlgebricksException {
lockMgr.acquireDataverseReadLock(locks, dataverseName);
- lockMgr.acquireDatasetModifyLock(locks, dataverseName, datasetName);
+ lockMgr.acquireDatasetExclusiveModificationLock(locks, dataverseName,
datasetName);
}
@Override
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
index 3762e82991..6e767ed2bf 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryInsertOperatorNodePushable.java
@@ -21,6 +21,7 @@ package org.apache.asterix.runtime.operators;
import java.nio.ByteBuffer;
import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
import org.apache.asterix.common.dataflow.LSMIndexUtil;
import org.apache.asterix.common.dataflow.NoOpFrameOperationCallbackFactory;
import org.apache.asterix.common.transactions.ILogMarkerCallback;
@@ -53,6 +54,7 @@ import
org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.IFrameTupleProcessor;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import
org.apache.hyracks.storage.am.lsm.common.dataflow.LSMIndexInsertUpdateDeleteOperatorNodePushable;
import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
@@ -61,6 +63,7 @@ import org.apache.hyracks.storage.common.IIndex;
import org.apache.hyracks.storage.common.IIndexAccessParameters;
import org.apache.hyracks.storage.common.IIndexAccessor;
import org.apache.hyracks.storage.common.IIndexCursor;
+import org.apache.hyracks.storage.common.ISearchOperationCallback;
import org.apache.hyracks.storage.common.MultiComparator;
import it.unimi.dsi.fastutil.ints.Int2ObjectMap;
@@ -76,7 +79,7 @@ public class LSMPrimaryInsertOperatorNodePushable extends
LSMIndexInsertUpdateDe
private MultiComparator keySearchCmp;
private RangePredicate searchPred;
private final IIndexCursor[] cursors;
- private final LockThenSearchOperationCallback[] searchCallbacks;
+ private final ISearchOperationCallback[] searchCallbacks;
private final ISearchOperationCallbackFactory searchCallbackFactory;
private final IFrameTupleProcessor[] processors;
private final LSMTreeIndexAccessor[] lsmAccessorForKeyIndexes;
@@ -101,7 +104,7 @@ public class LSMPrimaryInsertOperatorNodePushable extends
LSMIndexInsertUpdateDe
modCallbackFactory, null, tuplePartitionerFactory,
partitionsMap);
this.sourceLoc = sourceLoc;
this.frameOpCallbacks = new IFrameOperationCallback[partitions.length];
- this.searchCallbacks = new
LockThenSearchOperationCallback[partitions.length];
+ this.searchCallbacks = new ISearchOperationCallback[partitions.length];
this.cursors = new IIndexCursor[partitions.length];
this.lsmAccessorForUniqunessChecks = new
LSMTreeIndexAccessor[partitions.length];
this.lsmAccessorForKeyIndexes = new
LSMTreeIndexAccessor[partitions.length];
@@ -138,7 +141,7 @@ public class LSMPrimaryInsertOperatorNodePushable extends
LSMIndexInsertUpdateDe
IIndexCursor cursor = cursors[i];
IIndexAccessor indexAccessor = indexAccessors[i];
LSMTreeIndexAccessor lsmAccessorForKeyIndex =
lsmAccessorForKeyIndexes[i];
- LockThenSearchOperationCallback searchCallback =
searchCallbacks[i];
+ ISearchOperationCallback searchCallback = searchCallbacks[i];
processors[i] = new IFrameTupleProcessor() {
@Override
public void process(FrameTupleAccessor accessor,
ITupleReference tuple, int index)
@@ -155,7 +158,9 @@ public class LSMPrimaryInsertOperatorNodePushable extends
LSMIndexInsertUpdateDe
try {
if (cursor.hasNext()) {
// duplicate, skip
- searchCallback.release();
+ if (searchCallback instanceof
LockThenSearchOperationCallback) {
+ ((LockThenSearchOperationCallback)
searchCallback).release();
+ }
duplicate = true;
}
} finally {
@@ -226,7 +231,7 @@ public class LSMPrimaryInsertOperatorNodePushable extends
LSMIndexInsertUpdateDe
}
modCallbacks[i] =
modOpCallbackFactory.createModificationOperationCallback(indexHelper.getResource(),
ctx, this);
- searchCallbacks[i] = (LockThenSearchOperationCallback)
searchCallbackFactory
+ searchCallbacks[i] = searchCallbackFactory
.createSearchOperationCallback(indexHelper.getResource().getId(), ctx, this);
IIndexAccessParameters iap = new
IndexAccessParameters(modCallbacks[i], NoOpOperationCallback.INSTANCE);
indexAccessors[i] = index.createAccessor(iap);
@@ -240,6 +245,8 @@ public class LSMPrimaryInsertOperatorNodePushable extends
LSMIndexInsertUpdateDe
new
IndexAccessParameters(NoOpOperationCallback.INSTANCE, searchCallbacks[i]);
lsmAccessorForUniqunessChecks[i] =
(LSMTreeIndexAccessor)
indexForUniquessCheck.createAccessor(iapForUniquenessCheck);
+ setAtomicOpContextIfAtomic(indexForUniquessCheck,
lsmAccessorForUniqunessChecks[i]);
+
cursors[i] =
lsmAccessorForUniqunessChecks[i].createSearchCursor(false);
LSMIndexUtil.checkAndSetFirstLSN((AbstractLSMIndex) index,
appCtx.getTransactionSubsystem().getLogManager());
@@ -311,6 +318,12 @@ public class LSMPrimaryInsertOperatorNodePushable extends
LSMIndexInsertUpdateDe
failure = CleanupUtils.close(writer, failure);
failure = CleanupUtils.close(indexHelpers, failure);
failure = CleanupUtils.close(keyIndexHelpers, failure);
+ if (failure == null && !failed) {
+ commitAtomicInsert();
+ } else {
+ abortAtomicInsert();
+ }
+
if (failure != null) {
throw HyracksDataException.create(failure);
}
@@ -318,6 +331,7 @@ public class LSMPrimaryInsertOperatorNodePushable extends
LSMIndexInsertUpdateDe
@Override
public void fail() throws HyracksDataException {
+ failed = true;
writer.fail();
}
@@ -325,4 +339,24 @@ public class LSMPrimaryInsertOperatorNodePushable extends
LSMIndexInsertUpdateDe
public void flush() throws HyracksDataException {
// No op since nextFrame flushes by default
}
+
+ private void commitAtomicInsert() throws HyracksDataException {
+ for (IIndex index : indexes) {
+ if (((ILSMIndex) index).isAtomic()) {
+ PrimaryIndexOperationTracker opTracker =
+ ((PrimaryIndexOperationTracker) ((ILSMIndex)
index).getOperationTracker());
+ opTracker.commit();
+ }
+ }
+ }
+
+ private void abortAtomicInsert() throws HyracksDataException {
+ for (IIndex index : indexes) {
+ if (((ILSMIndex) index).isAtomic()) {
+ PrimaryIndexOperationTracker opTracker =
+ ((PrimaryIndexOperationTracker) ((ILSMIndex)
index).getOperationTracker());
+ opTracker.abort();
+ }
+ }
+ }
}
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
index 3a9a020caf..03130ae24a 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMPrimaryUpsertOperatorNodePushable.java
@@ -26,6 +26,7 @@ import java.text.SimpleDateFormat;
import java.util.Date;
import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
import org.apache.asterix.common.dataflow.LSMIndexUtil;
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.transactions.ILogMarkerCallback;
@@ -68,6 +69,7 @@ import
org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
import org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallback;
import
org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallbackFactory;
import org.apache.hyracks.storage.am.lsm.common.api.IFrameTupleProcessor;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import
org.apache.hyracks.storage.am.lsm.common.dataflow.LSMIndexInsertUpdateDeleteOperatorNodePushable;
import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
@@ -76,6 +78,7 @@ import org.apache.hyracks.storage.common.IIndex;
import org.apache.hyracks.storage.common.IIndexAccessParameters;
import org.apache.hyracks.storage.common.IIndexCursor;
import org.apache.hyracks.storage.common.IModificationOperationCallback;
+import org.apache.hyracks.storage.common.ISearchOperationCallback;
import org.apache.hyracks.storage.common.MultiComparator;
import org.apache.hyracks.storage.common.projection.ITupleProjector;
import org.apache.hyracks.storage.common.projection.ITupleProjectorFactory;
@@ -119,7 +122,7 @@ public class LSMPrimaryUpsertOperatorNodePushable extends
LSMIndexInsertUpdateDe
private final boolean hasMeta;
private final int filterFieldIndex;
private final int metaFieldIndex;
- protected final LockThenSearchOperationCallback[] searchCallbacks;
+ protected final ISearchOperationCallback[] searchCallbacks;
protected final IFrameOperationCallback[] frameOpCallbacks;
private final IFrameOperationCallbackFactory frameOpCallbackFactory;
private final ISearchOperationCallbackFactory searchCallbackFactory;
@@ -142,7 +145,7 @@ public class LSMPrimaryUpsertOperatorNodePushable extends
LSMIndexInsertUpdateDe
modCallbackFactory, null, tuplePartitionerFactory,
partitionsMap);
this.hasSecondaries = hasSecondaries;
this.frameOpCallbacks = new IFrameOperationCallback[partitions.length];
- this.searchCallbacks = new
LockThenSearchOperationCallback[partitions.length];
+ this.searchCallbacks = new ISearchOperationCallback[partitions.length];
this.cursors = new IIndexCursor[partitions.length];
this.processors = new IFrameTupleProcessor[partitions.length];
this.key = new PermutingFrameTupleReference();
@@ -180,7 +183,7 @@ public class LSMPrimaryUpsertOperatorNodePushable extends
LSMIndexInsertUpdateDe
for (int i = 0; i < partitions.length; i++) {
ILSMIndexAccessor lsmAccessor = (ILSMIndexAccessor)
indexAccessors[i];
IIndexCursor cursor = cursors[i];
- LockThenSearchOperationCallback searchCallback =
searchCallbacks[i];
+ ISearchOperationCallback searchCallback = searchCallbacks[i];
IModificationOperationCallback modCallback = modCallbacks[i];
IFrameOperationCallback frameOpCallback = frameOpCallbacks[i];
processors[i] = new IFrameTupleProcessor() {
@@ -189,8 +192,7 @@ public class LSMPrimaryUpsertOperatorNodePushable extends
LSMIndexInsertUpdateDe
throws HyracksDataException {
try {
tb.reset();
- AbstractIndexModificationOperationCallback
abstractModCallback =
- (AbstractIndexModificationOperationCallback)
modCallback;
+ IModificationOperationCallback abstractModCallback =
modCallback;
boolean recordWasInserted = false;
boolean recordWasDeleted = false;
boolean isDelete = isDeleteOperation(tuple,
numOfPrimaryKeys);
@@ -223,11 +225,17 @@ public class LSMPrimaryUpsertOperatorNodePushable extends
LSMIndexInsertUpdateDe
if (isDelete && prevTuple != null) {
// Only delete if it is a delete and not upsert
// And previous tuple with the same key was found
- abstractModCallback.setOp(Operation.DELETE);
+ if (abstractModCallback instanceof
AbstractIndexModificationOperationCallback) {
+ ((AbstractIndexModificationOperationCallback)
abstractModCallback)
+ .setOp(Operation.DELETE);
+ }
lsmAccessor.forceDelete(tuple);
recordWasDeleted = true;
} else if (!isDelete) {
- abstractModCallback.setOp(Operation.UPSERT);
+ if (abstractModCallback instanceof
AbstractIndexModificationOperationCallback) {
+ ((AbstractIndexModificationOperationCallback)
abstractModCallback)
+ .setOp(Operation.UPSERT);
+ }
lsmAccessor.forceUpsert(tuple);
recordWasInserted = true;
}
@@ -296,11 +304,12 @@ public class LSMPrimaryUpsertOperatorNodePushable extends
LSMIndexInsertUpdateDe
}
modCallbacks[i] =
modOpCallbackFactory.createModificationOperationCallback(indexHelper.getResource(),
ctx, this);
- searchCallbacks[i] = (LockThenSearchOperationCallback)
searchCallbackFactory
+ searchCallbacks[i] = searchCallbackFactory
.createSearchOperationCallback(indexHelper.getResource().getId(), ctx, this);
IIndexAccessParameters iap = new
IndexAccessParameters(modCallbacks[i], searchCallbacks[i]);
iap.getParameters().put(HyracksConstants.TUPLE_PROJECTOR,
tupleProjector);
indexAccessors[i] = indexes[i].createAccessor(iap);
+ setAtomicOpContextIfAtomic(indexes[i], indexAccessors[i]);
cursors[i] = ((LSMTreeIndexAccessor)
indexAccessors[i]).createSearchCursor(false);
LSMIndexUtil.checkAndSetFirstLSN((AbstractLSMIndex) indexes[i],
appCtx.getTransactionSubsystem().getLogManager());
@@ -353,7 +362,7 @@ public class LSMPrimaryUpsertOperatorNodePushable extends
LSMIndexInsertUpdateDe
}
protected void writeOutput(int tupleIndex, boolean recordWasInserted,
boolean recordWasDeleted,
- LockThenSearchOperationCallback searchCallback) throws IOException
{
+ ISearchOperationCallback searchCallback) throws IOException {
if (recordWasInserted || recordWasDeleted) {
frameTuple.reset(accessor, tupleIndex);
for (int i = 0; i < frameTuple.getFieldCount(); i++) {
@@ -363,7 +372,9 @@ public class LSMPrimaryUpsertOperatorNodePushable extends
LSMIndexInsertUpdateDe
FrameUtils.appendToWriter(writer, appender,
tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize());
} else {
try {
- searchCallback.release();
+ if (searchCallback instanceof LockThenSearchOperationCallback)
{
+ ((LockThenSearchOperationCallback)
searchCallback).release();
+ }
} catch (ACIDException e) {
throw HyracksDataException.create(e);
}
@@ -506,6 +517,12 @@ public class LSMPrimaryUpsertOperatorNodePushable extends
LSMIndexInsertUpdateDe
failure = CleanupUtils.destroy(failure, cursors);
failure = CleanupUtils.close(writer, failure);
failure = CleanupUtils.close(indexHelpers, failure);
+ if (failure == null && !failed) {
+ commitAtomicUpsert();
+ } else {
+ abortAtomicUpsert();
+ }
+
if (failure != null) {
throw HyracksDataException.create(failure);
}
@@ -531,6 +548,7 @@ public class LSMPrimaryUpsertOperatorNodePushable extends
LSMIndexInsertUpdateDe
@Override
public void fail() throws HyracksDataException {
+ failed = true;
writer.fail();
}
@@ -538,4 +556,24 @@ public class LSMPrimaryUpsertOperatorNodePushable extends
LSMIndexInsertUpdateDe
public void flush() throws HyracksDataException {
// No op since nextFrame flushes by default
}
+
+ private void commitAtomicUpsert() throws HyracksDataException {
+ for (IIndex index : indexes) {
+ if (((ILSMIndex) index).isAtomic()) {
+ PrimaryIndexOperationTracker opTracker =
+ ((PrimaryIndexOperationTracker) ((ILSMIndex)
index).getOperationTracker());
+ opTracker.commit();
+ }
+ }
+ }
+
+ private void abortAtomicUpsert() throws HyracksDataException {
+ for (IIndex index : indexes) {
+ if (((ILSMIndex) index).isAtomic()) {
+ PrimaryIndexOperationTracker opTracker =
+ ((PrimaryIndexOperationTracker) ((ILSMIndex)
index).getOperationTracker());
+ opTracker.abort();
+ }
+ }
+ }
}
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java
index 6d5e88b0cb..3dc6d379c9 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertOperatorNodePushable.java
@@ -41,6 +41,7 @@ import
org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import
org.apache.hyracks.storage.am.lsm.common.dataflow.LSMIndexInsertUpdateDeleteOperatorNodePushable;
+import org.apache.hyracks.storage.common.IModificationOperationCallback;
/**
* This operator node is used for secondary indexes with upsert operations.
@@ -101,8 +102,7 @@ public class LSMSecondaryUpsertOperatorNodePushable extends
LSMIndexInsertUpdate
int storagePartition = tuplePartitioner.partition(accessor, i);
int storageIdx =
storagePartitionId2Index.get(storagePartition);
ILSMIndexAccessor lsmAccessor = (ILSMIndexAccessor)
indexAccessors[storageIdx];
- AbstractIndexModificationOperationCallback abstractModCallback
=
- (AbstractIndexModificationOperationCallback)
modCallbacks[storageIdx];
+ IModificationOperationCallback abstractModCallback =
modCallbacks[storageIdx];
frameTuple.reset(accessor, i);
int operation =
operationInspector.getIntegerValue(frameTuple.getFieldData(operationFieldIndex),
frameTuple.getFieldStart(operationFieldIndex),
frameTuple.getFieldLength(operationFieldIndex));
@@ -111,23 +111,33 @@ public class LSMSecondaryUpsertOperatorNodePushable
extends LSMIndexInsertUpdate
if (operation == UPSERT_NEW) {
if (tupleFilterIsNull || tupleFilter.accept(frameTuple)) {
- abstractModCallback.setOp(Operation.INSERT);
+ if (abstractModCallback instanceof
AbstractIndexModificationOperationCallback) {
+ ((AbstractIndexModificationOperationCallback)
abstractModCallback).setOp(Operation.INSERT);
+ }
lsmAccessor.forceInsert(tuple);
}
} else if (operation == UPSERT_EXISTING) {
if (!TupleUtils.equalTuples(tuple, prevTuple,
numberOfFields)) {
if (prevTupleFilterIsNull ||
prevTupleFilter.accept(frameTuple)) {
- abstractModCallback.setOp(Operation.DELETE);
+ if (abstractModCallback instanceof
AbstractIndexModificationOperationCallback) {
+ ((AbstractIndexModificationOperationCallback)
abstractModCallback)
+ .setOp(Operation.DELETE);
+ }
lsmAccessor.forceDelete(prevTuple);
}
if (tupleFilterIsNull ||
tupleFilter.accept(frameTuple)) {
- abstractModCallback.setOp(Operation.INSERT);
+ if (abstractModCallback instanceof
AbstractIndexModificationOperationCallback) {
+ ((AbstractIndexModificationOperationCallback)
abstractModCallback)
+ .setOp(Operation.INSERT);
+ }
lsmAccessor.forceInsert(tuple);
}
}
} else if (operation == DELETE_EXISTING) {
if (prevTupleFilterIsNull ||
prevTupleFilter.accept(frameTuple)) {
- abstractModCallback.setOp(Operation.DELETE);
+ if (abstractModCallback instanceof
AbstractIndexModificationOperationCallback) {
+ ((AbstractIndexModificationOperationCallback)
abstractModCallback).setOp(Operation.DELETE);
+ }
lsmAccessor.forceDelete(prevTuple);
}
}
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertWithNestedPlanOperatorNodePushable.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertWithNestedPlanOperatorNodePushable.java
index 17858a3ebe..3eb9437f83 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertWithNestedPlanOperatorNodePushable.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/operators/LSMSecondaryUpsertWithNestedPlanOperatorNodePushable.java
@@ -41,6 +41,7 @@ import
org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFa
import
org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import org.apache.hyracks.storage.am.common.tuples.ConcatenatingTupleReference;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
+import org.apache.hyracks.storage.common.IModificationOperationCallback;
public class LSMSecondaryUpsertWithNestedPlanOperatorNodePushable extends
LSMSecondaryUpsertOperatorNodePushable {
private final NestedTupleSourceRuntime[] startOfNewKeyPipelines;
@@ -190,11 +191,13 @@ public class
LSMSecondaryUpsertWithNestedPlanOperatorNodePushable extends LSMSec
int storagePartition =
tuplePartitioner.partition(tuple.getFrameTupleAccessor(),
tuple.getTupleIndex());
int storageIdx =
storagePartitionId2Index.get(storagePartition);
ILSMIndexAccessor workingLSMAccessor = (ILSMIndexAccessor)
indexAccessors[storageIdx];
- AbstractIndexModificationOperationCallback abstractModCallback
=
- (AbstractIndexModificationOperationCallback)
modCallbacks[storageIdx];
+ IModificationOperationCallback abstractModCallback =
modCallbacks[storageIdx];
// Finally, pass the tuple to our accessor. There are only two
operations: insert or delete.
if (this.isInsert) {
-
abstractModCallback.setOp(AbstractIndexModificationOperationCallback.Operation.INSERT);
+ if (abstractModCallback instanceof
AbstractIndexModificationOperationCallback) {
+ ((AbstractIndexModificationOperationCallback)
abstractModCallback)
+
.setOp(AbstractIndexModificationOperationCallback.Operation.INSERT);
+ }
try {
workingLSMAccessor.forceInsert(endTupleReference);
} catch (HyracksDataException e) {
@@ -203,7 +206,10 @@ public class
LSMSecondaryUpsertWithNestedPlanOperatorNodePushable extends LSMSec
}
}
} else {
-
abstractModCallback.setOp(AbstractIndexModificationOperationCallback.Operation.DELETE);
+ if (abstractModCallback instanceof
AbstractIndexModificationOperationCallback) {
+ ((AbstractIndexModificationOperationCallback)
abstractModCallback)
+
.setOp(AbstractIndexModificationOperationCallback.Operation.DELETE);
+ }
workingLSMAccessor.forceDelete(endTupleReference);
}
}
diff --git
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/NoOpCommitRuntime.java
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/NoOpCommitRuntime.java
new file mode 100644
index 0000000000..2312f15920
--- /dev/null
+++
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/NoOpCommitRuntime.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.transaction.management.runtime;
+
+import java.nio.ByteBuffer;
+
+import org.apache.asterix.common.exceptions.ACIDException;
+import org.apache.asterix.common.transactions.TxnId;
+import org.apache.hyracks.api.comm.IFrame;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.HyracksConstants;
+import org.apache.hyracks.dataflow.common.io.MessagingFrameTupleAppender;
+import org.apache.hyracks.dataflow.common.utils.TaskUtil;
+
+public class NoOpCommitRuntime extends CommitRuntime {
+
+ public NoOpCommitRuntime(IHyracksTaskContext ctx, TxnId txnId, int
datasetId, int[] primaryKeyFields,
+ boolean isWriteTransaction, int resourcePartition, boolean isSink,
+ ITuplePartitionerFactory partitionerFactory, int[]
datasetPartitions) {
+ super(ctx, txnId, datasetId, primaryKeyFields, isWriteTransaction,
resourcePartition, isSink,
+ partitionerFactory, datasetPartitions);
+ }
+
+ @Override
+ public void open() throws HyracksDataException {
+ try {
+ transactionContext =
transactionManager.getTransactionContext(txnId);
+ transactionContext.setWriteTxn(isWriteTransaction);
+ if (isSink) {
+ return;
+ }
+ initAccessAppend(ctx);
+ super.open();
+ } catch (ACIDException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ @Override
+ public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
+ tAccess.reset(buffer);
+ int nTuple = tAccess.getTupleCount();
+ for (int t = 0; t < nTuple; t++) {
+ tRef.reset(tAccess, t);
+ try {
+ if (!isSink) {
+ appendTupleToFrame(t);
+ }
+ } catch (ACIDException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+ IFrame message = TaskUtil.get(HyracksConstants.KEY_MESSAGE, ctx);
+ if (message != null
+ && MessagingFrameTupleAppender.getMessageType(message) ==
MessagingFrameTupleAppender.MARKER_MESSAGE) {
+ message.reset();
+
message.getBuffer().put(MessagingFrameTupleAppender.NULL_FEED_MESSAGE);
+ message.getBuffer().flip();
+ }
+ }
+}
diff --git
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/NoOpCommitRuntimeFactory.java
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/NoOpCommitRuntimeFactory.java
new file mode 100644
index 0000000000..bfe610b278
--- /dev/null
+++
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/runtime/NoOpCommitRuntimeFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.asterix.transaction.management.runtime;
+
+import org.apache.asterix.common.api.IJobEventListenerFactory;
+import org.apache.hyracks.algebricks.runtime.base.IPushRuntime;
+import org.apache.hyracks.api.context.IHyracksTaskContext;
+import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.job.IJobletEventListenerFactory;
+
+public class NoOpCommitRuntimeFactory extends CommitRuntimeFactory {
+
+ private static final long serialVersionUID = 2L;
+
+ public NoOpCommitRuntimeFactory(int datasetId, int[] primaryKeyFields,
boolean isWriteTransaction,
+ int[] datasetPartitions, boolean isSink, ITuplePartitionerFactory
partitionerFactory) {
+ super(datasetId, primaryKeyFields, isWriteTransaction,
datasetPartitions, isSink, partitionerFactory);
+ }
+
+ @Override
+ public IPushRuntime[] createPushRuntime(IHyracksTaskContext ctx) throws
HyracksDataException {
+ IJobletEventListenerFactory fact =
ctx.getJobletContext().getJobletEventListenerFactory();
+ return new IPushRuntime[] { new NoOpCommitRuntime(ctx,
((IJobEventListenerFactory) fact).getTxnId(datasetId),
+ datasetId, primaryKeyFields, isWriteTransaction,
+
datasetPartitions[ctx.getTaskAttemptId().getTaskId().getPartition()], isSink,
partitionerFactory,
+ datasetPartitions) };
+ }
+}
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/HyracksConstants.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/HyracksConstants.java
index 3c829e65ee..b076cf2c0d 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/HyracksConstants.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/HyracksConstants.java
@@ -30,6 +30,8 @@ public class HyracksConstants {
public static final String TUPLE_PROJECTOR = "TUPLE_PROJECTOR";
+ public static final String ATOMIC_OP_CONTEXT = "ATOMIC_OP_CONTEXT";
+
private HyracksConstants() {
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/lsm/btree/LSMBTreeOperatorTestHelper.java
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/lsm/btree/LSMBTreeOperatorTestHelper.java
index c3bc5cacb6..e279eaa844 100644
---
a/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/lsm/btree/LSMBTreeOperatorTestHelper.java
+++
b/hyracks-fullstack/hyracks/hyracks-examples/hyracks-integration-tests/src/test/java/org/apache/hyracks/tests/am/lsm/btree/LSMBTreeOperatorTestHelper.java
@@ -49,6 +49,6 @@ public class LSMBTreeOperatorTestHelper extends
LSMTreeOperatorTestHelper {
getVirtualBufferCacheProvider(),
SynchronousSchedulerProvider.INSTANCE, MERGE_POLICY_FACTORY,
MERGE_POLICY_PROPERTIES, DURABLE, bloomFilterKeyFields,
LSMTreeOperatorTestHelper.DEFAULT_BLOOM_FILTER_FALSE_POSITIVE_RATE, true,
btreefields,
- NoOpCompressorDecompressorFactory.INSTANCE,
bloomFilterKeyFields != null, null, null, false);
+ NoOpCompressorDecompressorFactory.INSTANCE,
bloomFilterKeyFields != null, null, null, false, false);
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java
index 422aef36fc..10425f4d86 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/dataflow/IndexInsertUpdateDeleteOperatorNodePushable.java
@@ -69,6 +69,7 @@ public class IndexInsertUpdateDeleteOperatorNodePushable
extends AbstractUnaryIn
protected final int[] partitions;
protected final Int2IntMap storagePartitionId2Index;
protected boolean writerOpen;
+ protected boolean failed;
public IndexInsertUpdateDeleteOperatorNodePushable(IHyracksTaskContext
ctx, int partition,
IIndexDataflowHelperFactory indexHelperFactory, int[]
fieldPermutation, RecordDescriptor inputRecDesc,
@@ -92,6 +93,7 @@ public class IndexInsertUpdateDeleteOperatorNodePushable
extends AbstractUnaryIn
this.op = op;
this.tuple.setFieldPermutation(fieldPermutation);
this.tuplePartitioner = tuplePartitionerFactory.createPartitioner(ctx);
+ this.failed = false;
}
@Override
@@ -203,6 +205,7 @@ public class IndexInsertUpdateDeleteOperatorNodePushable
extends AbstractUnaryIn
@Override
public void fail() throws HyracksDataException {
+ failed = true;
if (writerOpen) {
writer.fail();
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/dataflow/LSMColumnBTreeLocalResource.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/dataflow/LSMColumnBTreeLocalResource.java
index 8c47bba1ff..a0b592a5e1 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/dataflow/LSMColumnBTreeLocalResource.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/dataflow/LSMColumnBTreeLocalResource.java
@@ -61,22 +61,22 @@ public class LSMColumnBTreeLocalResource extends
LSMBTreeLocalResource {
ILSMIOOperationSchedulerProvider ioSchedulerProvider,
ICompressorDecompressorFactory compressorDecompressorFactory,
ITypeTraits nullTypeTraits,
INullIntrospector nullIntrospector, boolean
isSecondaryNoIncrementalMaintenance,
- IColumnManagerFactory columnManagerFactory) {
+ IColumnManagerFactory columnManagerFactory, boolean atomic) {
super(typeTraits, cmpFactories, bloomFilterKeyFields,
bloomFilterFalsePositiveRate, true, path, storageManager,
mergePolicyFactory, mergePolicyProperties, null, null,
btreeFields, null, opTrackerProvider,
ioOpCallbackFactory, pageWriteCallbackFactory,
metadataPageManagerFactory, vbcProvider,
ioSchedulerProvider, true, compressorDecompressorFactory,
true, nullTypeTraits, nullIntrospector,
- isSecondaryNoIncrementalMaintenance);
+ isSecondaryNoIncrementalMaintenance, atomic);
this.columnManagerFactory = columnManagerFactory;
}
private LSMColumnBTreeLocalResource(IPersistedResourceRegistry registry,
JsonNode json, int[] bloomFilterKeyFields,
double bloomFilterFalsePositiveRate, boolean isPrimary, int[]
btreeFields,
ICompressorDecompressorFactory compressorDecompressorFactory,
boolean hasBloomFilter,
- boolean isSecondaryNoIncrementalMaintenance, IColumnManagerFactory
columnManagerFactory)
+ boolean isSecondaryNoIncrementalMaintenance, IColumnManagerFactory
columnManagerFactory, boolean atomic)
throws HyracksDataException {
super(registry, json, bloomFilterKeyFields,
bloomFilterFalsePositiveRate, isPrimary, btreeFields,
- compressorDecompressorFactory, hasBloomFilter,
isSecondaryNoIncrementalMaintenance);
+ compressorDecompressorFactory, hasBloomFilter,
isSecondaryNoIncrementalMaintenance, atomic);
this.columnManagerFactory = columnManagerFactory;
}
@@ -93,7 +93,7 @@ public class LSMColumnBTreeLocalResource extends
LSMBTreeLocalResource {
opTrackerProvider.getOperationTracker(serviceCtx, this),
ioSchedulerProvider.getIoScheduler(serviceCtx),
ioOpCallbackFactory, pageWriteCallbackFactory, btreeFields,
metadataPageManagerFactory, false,
serviceCtx.getTracer(), compressorDecompressorFactory,
nullTypeTraits, nullIntrospector,
- columnManagerFactory);
+ columnManagerFactory, atomic);
}
public static IJsonSerializable fromJson(IPersistedResourceRegistry
registry, JsonNode json)
@@ -109,11 +109,12 @@ public class LSMColumnBTreeLocalResource extends
LSMBTreeLocalResource {
JsonNode columnManagerFactoryNode = json.get("columnManagerFactory");
boolean isSecondaryNoIncrementalMaintenance =
getOrDefaultBoolean(json,
"isSecondaryNoIncrementalMaintenance", false);
+ boolean atomic = getOrDefaultBoolean(json, "atomic", false);
IColumnManagerFactory columnManagerFactory =
(IColumnManagerFactory)
registry.deserialize(columnManagerFactoryNode);
return new LSMColumnBTreeLocalResource(registry, json,
bloomFilterKeyFields, bloomFilterFalsePositiveRate,
isPrimary, btreeFields, compDecompFactory, hasBloomFilter,
isSecondaryNoIncrementalMaintenance,
- columnManagerFactory);
+ columnManagerFactory, atomic);
}
@Override
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/dataflow/LSMColumnBTreeLocalResourceFactory.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/dataflow/LSMColumnBTreeLocalResourceFactory.java
index eccb7c2ceb..2cd045a5d9 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/dataflow/LSMColumnBTreeLocalResourceFactory.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/dataflow/LSMColumnBTreeLocalResourceFactory.java
@@ -51,12 +51,12 @@ public class LSMColumnBTreeLocalResourceFactory extends
LSMBTreeLocalResourceFac
Map<String, String> mergePolicyProperties, int[]
bloomFilterKeyFields, double bloomFilterFalsePositiveRate,
int[] btreeFields, ICompressorDecompressorFactory
compressorDecompressorFactory, ITypeTraits nullTypeTraits,
INullIntrospector nullIntrospector, boolean
isSecondaryNoIncrementalMaintenance,
- IColumnManagerFactory columnManagerFactory) {
+ IColumnManagerFactory columnManagerFactory, boolean atomic) {
super(storageManager, typeTraits, cmpFactories, filterTypeTraits,
filterCmpFactories, filterFields,
opTrackerFactory, ioOpCallbackFactory,
pageWriteCallbackFactory, metadataPageManagerFactory,
vbcProvider, ioSchedulerProvider, mergePolicyFactory,
mergePolicyProperties, true, bloomFilterKeyFields,
bloomFilterFalsePositiveRate, true, btreeFields,
compressorDecompressorFactory, true, nullTypeTraits,
- nullIntrospector, isSecondaryNoIncrementalMaintenance);
+ nullIntrospector, isSecondaryNoIncrementalMaintenance, atomic);
this.columnManagerFactory = columnManagerFactory;
}
@@ -66,6 +66,6 @@ public class LSMColumnBTreeLocalResourceFactory extends
LSMBTreeLocalResourceFac
bloomFilterFalsePositiveRate, fileRef.getRelativePath(),
storageManager, mergePolicyFactory,
mergePolicyProperties, btreeFields, opTrackerProvider,
ioOpCallbackFactory, pageWriteCallbackFactory,
metadataPageManagerFactory, vbcProvider, ioSchedulerProvider,
compressorDecompressorFactory,
- nullTypeTraits, nullIntrospector,
isSecondaryNoIncrementalMaintenance, columnManagerFactory);
+ nullTypeTraits, nullIntrospector,
isSecondaryNoIncrementalMaintenance, columnManagerFactory, atomic);
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/LSMColumnBTree.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/LSMColumnBTree.java
index 048d9dedd1..e95b3808ca 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/LSMColumnBTree.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/impls/lsm/LSMColumnBTree.java
@@ -70,11 +70,13 @@ public class LSMColumnBTree extends LSMBTree {
double bloomFilterFalsePositiveRate, int fieldCount,
IBinaryComparatorFactory[] cmpFactories,
ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker,
ILSMIOOperationScheduler ioScheduler,
ILSMIOOperationCallbackFactory ioOpCallbackFactory,
ILSMPageWriteCallbackFactory pageWriteCallbackFactory,
- int[] btreeFields, ITracer tracer, IColumnManager columnManager)
throws HyracksDataException {
+ int[] btreeFields, ITracer tracer, IColumnManager columnManager,
boolean atomic)
+ throws HyracksDataException {
super(ioManager, virtualBufferCaches, interiorFrameFactory,
insertLeafFrameFactory, deleteLeafFrameFactory,
diskBufferCache, fileManager, componentFactory,
bulkloadComponentFactory, null, null, null,
bloomFilterFalsePositiveRate, fieldCount, cmpFactories,
mergePolicy, opTracker, ioScheduler,
- ioOpCallbackFactory, pageWriteCallbackFactory, true, true,
btreeFields, null, true, false, tracer);
+ ioOpCallbackFactory, pageWriteCallbackFactory, true, true,
btreeFields, null, true, false, tracer,
+ atomic);
this.columnManager = columnManager;
this.mergeComponentFactory = mergeComponentFactory;
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/utils/LSMColumnBTreeUtil.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/utils/LSMColumnBTreeUtil.java
index 1a554477f6..6ef0dc6f1d 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/utils/LSMColumnBTreeUtil.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree-column/src/main/java/org/apache/hyracks/storage/am/lsm/btree/column/utils/LSMColumnBTreeUtil.java
@@ -63,7 +63,7 @@ public class LSMColumnBTreeUtil {
ILSMIOOperationCallbackFactory ioOpCallbackFactory,
ILSMPageWriteCallbackFactory pageWriteCallbackFactory,
int[] btreeFields, IMetadataPageManagerFactory
freePageManagerFactory, boolean updateAware, ITracer tracer,
ICompressorDecompressorFactory compressorDecompressorFactory,
ITypeTraits nullTypeTraits,
- INullIntrospector nullIntrospector, IColumnManagerFactory
columnManagerFactory)
+ INullIntrospector nullIntrospector, IColumnManagerFactory
columnManagerFactory, boolean atomic)
throws HyracksDataException {
//Tuple writers
@@ -111,6 +111,6 @@ public class LSMColumnBTreeUtil {
deleteLeafFrameFactory, diskBufferCache, fileNameManager,
flushComponentFactory, mergeComponentFactory,
bulkLoadComponentFactory, bloomFilterFalsePositiveRate,
typeTraits.length, cmpFactories, mergePolicy,
opTracker, ioScheduler, ioOpCallbackFactory,
pageWriteCallbackFactory, btreeFields, tracer,
- columnManagerFactory.createColumnManager());
+ columnManagerFactory.createColumnManager(), atomic);
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResource.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResource.java
index 0a47fc0137..43555ca126 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResource.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResource.java
@@ -60,6 +60,7 @@ public class LSMBTreeLocalResource extends LsmResource {
protected final int[] btreeFields;
protected final ICompressorDecompressorFactory
compressorDecompressorFactory;
protected final boolean isSecondaryNoIncrementalMaintenance;
+ protected final boolean atomic;
public LSMBTreeLocalResource(ITypeTraits[] typeTraits,
IBinaryComparatorFactory[] cmpFactories,
int[] bloomFilterKeyFields, double bloomFilterFalsePositiveRate,
boolean isPrimary, String path,
@@ -71,8 +72,8 @@ public class LSMBTreeLocalResource extends LsmResource {
IMetadataPageManagerFactory metadataPageManagerFactory,
IVirtualBufferCacheProvider vbcProvider,
ILSMIOOperationSchedulerProvider ioSchedulerProvider, boolean
durable,
ICompressorDecompressorFactory compressorDecompressorFactory,
boolean hasBloomFilter,
- ITypeTraits nullTypeTraits, INullIntrospector nullIntrospector,
- boolean isSecondaryNoIncrementalMaintenance) {
+ ITypeTraits nullTypeTraits, INullIntrospector nullIntrospector,
boolean isSecondaryNoIncrementalMaintenance,
+ boolean atomic) {
super(path, storageManager, typeTraits, cmpFactories,
filterTypeTraits, filterCmpFactories, filterFields,
opTrackerProvider, ioOpCallbackFactory,
pageWriteCallbackFactory, metadataPageManagerFactory,
vbcProvider, ioSchedulerProvider, mergePolicyFactory,
mergePolicyProperties, durable, nullTypeTraits,
@@ -84,12 +85,13 @@ public class LSMBTreeLocalResource extends LsmResource {
this.compressorDecompressorFactory = compressorDecompressorFactory;
this.hasBloomFilter = hasBloomFilter;
this.isSecondaryNoIncrementalMaintenance =
isSecondaryNoIncrementalMaintenance;
+ this.atomic = atomic;
}
protected LSMBTreeLocalResource(IPersistedResourceRegistry registry,
JsonNode json, int[] bloomFilterKeyFields,
double bloomFilterFalsePositiveRate, boolean isPrimary, int[]
btreeFields,
ICompressorDecompressorFactory compressorDecompressorFactory,
boolean hasBloomFilter,
- boolean isSecondaryNoIncrementalMaintenance) throws
HyracksDataException {
+ boolean isSecondaryNoIncrementalMaintenance, boolean atomic)
throws HyracksDataException {
super(registry, json);
this.bloomFilterKeyFields = bloomFilterKeyFields;
this.bloomFilterFalsePositiveRate = bloomFilterFalsePositiveRate;
@@ -98,6 +100,7 @@ public class LSMBTreeLocalResource extends LsmResource {
this.compressorDecompressorFactory = compressorDecompressorFactory;
this.hasBloomFilter = hasBloomFilter;
this.isSecondaryNoIncrementalMaintenance =
isSecondaryNoIncrementalMaintenance;
+ this.atomic = atomic;
}
@Override
@@ -115,7 +118,7 @@ public class LSMBTreeLocalResource extends LsmResource {
opTrackerProvider.getOperationTracker(serviceCtx, this),
ioSchedulerProvider.getIoScheduler(serviceCtx),
ioOpCallbackFactory, pageWriteCallbackFactory, isPrimary,
filterTypeTraits, filterCmpFactories,
btreeFields, filterFields, durable,
metadataPageManagerFactory, updateAware, serviceCtx.getTracer(),
- compressorDecompressorFactory, hasBloomFilter, nullTypeTraits,
nullIntrospector);
+ compressorDecompressorFactory, hasBloomFilter, nullTypeTraits,
nullIntrospector, atomic);
}
public boolean isSecondaryNoIncrementalMaintenance() {
@@ -141,8 +144,9 @@ public class LSMBTreeLocalResource extends LsmResource {
.deserializeOrDefault(compressorDecompressorNode,
NoOpCompressorDecompressorFactory.class);
boolean isSecondaryNoIncrementalMaintenance =
getOrDefaultBoolean(json,
"isSecondaryNoIncrementalMaintenance", false);
+ boolean atomic = getOrDefaultBoolean(json, "atomic", false);
return new LSMBTreeLocalResource(registry, json, bloomFilterKeyFields,
bloomFilterFalsePositiveRate, isPrimary,
- btreeFields, compDecompFactory, hasBloomFilter,
isSecondaryNoIncrementalMaintenance);
+ btreeFields, compDecompFactory, hasBloomFilter,
isSecondaryNoIncrementalMaintenance, atomic);
}
@Override
@@ -156,6 +160,7 @@ public class LSMBTreeLocalResource extends LsmResource {
json.putPOJO("btreeFields", btreeFields);
json.putPOJO("compressorDecompressorFactory",
compressorDecompressorFactory.toJson(registry));
json.put("isSecondaryNoIncrementalMaintenance",
isSecondaryNoIncrementalMaintenance);
+ json.put("atomic", atomic);
}
protected static boolean getOrDefaultHasBloomFilter(JsonNode json, boolean
isPrimary) {
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResourceFactory.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResourceFactory.java
index 6695e9057e..bb44655d74 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResourceFactory.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/dataflow/LSMBTreeLocalResourceFactory.java
@@ -49,6 +49,7 @@ public class LSMBTreeLocalResourceFactory extends
LsmResourceFactory {
protected final int[] btreeFields;
protected final ICompressorDecompressorFactory
compressorDecompressorFactory;
protected final boolean isSecondaryNoIncrementalMaintenance;
+ protected final boolean atomic;
public LSMBTreeLocalResourceFactory(IStorageManager storageManager,
ITypeTraits[] typeTraits,
IBinaryComparatorFactory[] cmpFactories, ITypeTraits[]
filterTypeTraits,
@@ -60,8 +61,8 @@ public class LSMBTreeLocalResourceFactory extends
LsmResourceFactory {
Map<String, String> mergePolicyProperties, boolean durable, int[]
bloomFilterKeyFields,
double bloomFilterFalsePositiveRate, boolean isPrimary, int[]
btreeFields,
ICompressorDecompressorFactory compressorDecompressorFactory,
boolean hasBloomFilter,
- ITypeTraits nullTypeTraits, INullIntrospector nullIntrospector,
- boolean isSecondaryNoIncrementalMaintenance) {
+ ITypeTraits nullTypeTraits, INullIntrospector nullIntrospector,
boolean isSecondaryNoIncrementalMaintenance,
+ boolean atomic) {
super(storageManager, typeTraits, cmpFactories, filterTypeTraits,
filterCmpFactories, filterFields,
opTrackerFactory, ioOpCallbackFactory,
pageWriteCallbackFactory, metadataPageManagerFactory,
vbcProvider, ioSchedulerProvider, mergePolicyFactory,
mergePolicyProperties, durable, nullTypeTraits,
@@ -73,6 +74,7 @@ public class LSMBTreeLocalResourceFactory extends
LsmResourceFactory {
this.btreeFields = btreeFields;
this.compressorDecompressorFactory = compressorDecompressorFactory;
this.isSecondaryNoIncrementalMaintenance =
isSecondaryNoIncrementalMaintenance;
+ this.atomic = atomic;
}
@Override
@@ -82,7 +84,7 @@ public class LSMBTreeLocalResourceFactory extends
LsmResourceFactory {
filterTypeTraits, filterCmpFactories, btreeFields,
filterFields, opTrackerProvider, ioOpCallbackFactory,
pageWriteCallbackFactory, metadataPageManagerFactory,
vbcProvider, ioSchedulerProvider, durable,
compressorDecompressorFactory, hasBloomFilter, nullTypeTraits,
nullIntrospector,
- isSecondaryNoIncrementalMaintenance);
+ isSecondaryNoIncrementalMaintenance, atomic);
}
private void readObject(java.io.ObjectInputStream in) throws IOException,
ClassNotFoundException {
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
index eceefc68c5..05e78dd44d 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/impls/LSMBTree.java
@@ -100,11 +100,11 @@ public class LSMBTree extends AbstractLSMIndex implements
ITreeIndex {
ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker,
ILSMIOOperationScheduler ioScheduler,
ILSMIOOperationCallbackFactory ioOpCallbackFactory,
ILSMPageWriteCallbackFactory pageWriteCallbackFactory,
boolean needKeyDupCheck, boolean hasBloomFilter, int[]
btreeFields, int[] filterFields, boolean durable,
- boolean updateAware, ITracer tracer) throws HyracksDataException {
+ boolean updateAware, ITracer tracer, boolean atomic) throws
HyracksDataException {
super(ioManager, virtualBufferCaches, diskBufferCache, fileManager,
bloomFilterFalsePositiveRate, mergePolicy,
opTracker, ioScheduler, ioOpCallbackFactory,
pageWriteCallbackFactory, componentFactory,
bulkLoadComponentFactory, filterFrameFactory, filterManager,
filterFields, durable, filterHelper,
- btreeFields, tracer);
+ btreeFields, tracer, atomic);
this.insertLeafFrameFactory = insertLeafFrameFactory;
this.deleteLeafFrameFactory = deleteLeafFrameFactory;
this.cmpFactories = cmpFactories;
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/utils/LSMBTreeUtil.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/utils/LSMBTreeUtil.java
index c3ca3d5bde..3106b2924c 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/utils/LSMBTreeUtil.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-btree/src/main/java/org/apache/hyracks/storage/am/lsm/btree/utils/LSMBTreeUtil.java
@@ -72,6 +72,23 @@ public class LSMBTreeUtil {
boolean updateAware, ITracer tracer,
ICompressorDecompressorFactory compressorDecompressorFactory,
boolean hasBloomFilter, ITypeTraits nullTypeTraits,
INullIntrospector nullIntrospector)
throws HyracksDataException {
+ return createLSMTree(ioManager, virtualBufferCaches, file,
diskBufferCache, typeTraits, cmpFactories,
+ bloomFilterKeyFields, bloomFilterFalsePositiveRate,
mergePolicy, opTracker, ioScheduler,
+ ioOpCallbackFactory, pageWriteCallbackFactory,
needKeyDupCheck, filterTypeTraits, filterCmpFactories,
+ btreeFields, filterFields, durable, freePageManagerFactory,
updateAware, tracer,
+ compressorDecompressorFactory, hasBloomFilter, nullTypeTraits,
nullIntrospector, false);
+ }
+
+ public static LSMBTree createLSMTree(IIOManager ioManager,
List<IVirtualBufferCache> virtualBufferCaches,
+ FileReference file, IBufferCache diskBufferCache, ITypeTraits[]
typeTraits,
+ IBinaryComparatorFactory[] cmpFactories, int[]
bloomFilterKeyFields, double bloomFilterFalsePositiveRate,
+ ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker,
ILSMIOOperationScheduler ioScheduler,
+ ILSMIOOperationCallbackFactory ioOpCallbackFactory,
ILSMPageWriteCallbackFactory pageWriteCallbackFactory,
+ boolean needKeyDupCheck, ITypeTraits[] filterTypeTraits,
IBinaryComparatorFactory[] filterCmpFactories,
+ int[] btreeFields, int[] filterFields, boolean durable,
IMetadataPageManagerFactory freePageManagerFactory,
+ boolean updateAware, ITracer tracer,
ICompressorDecompressorFactory compressorDecompressorFactory,
+ boolean hasBloomFilter, ITypeTraits nullTypeTraits,
INullIntrospector nullIntrospector, boolean atomic)
+ throws HyracksDataException {
LSMBTreeTupleWriterFactory insertTupleWriterFactory = new
LSMBTreeTupleWriterFactory(typeTraits,
cmpFactories.length, false, updateAware, nullTypeTraits,
nullIntrospector);
LSMBTreeTupleWriterFactory deleteTupleWriterFactory = new
LSMBTreeTupleWriterFactory(typeTraits,
@@ -124,6 +141,6 @@ public class LSMBTreeUtil {
deleteLeafFrameFactory, diskBufferCache, fileNameManager,
componentFactory, bulkLoadComponentFactory,
filterHelper, filterFrameFactory, filterManager,
bloomFilterFalsePositiveRate, typeTraits.length,
cmpFactories, mergePolicy, opTracker, ioScheduler,
ioOpCallbackFactory, pageWriteCallbackFactory,
- needKeyDupCheck, hasBloomFilter, btreeFields, filterFields,
durable, updateAware, tracer);
+ needKeyDupCheck, hasBloomFilter, btreeFields, filterFields,
durable, updateAware, tracer, atomic);
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java
index 69b954775b..4756921400 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIndex.java
@@ -43,6 +43,27 @@ import org.apache.hyracks.storage.common.ISearchPredicate;
*/
public interface ILSMIndex extends IIndex {
+ boolean isAtomic();
+
+ /**
+ * Commits the atomic statement by adding all the temporary disk
components generated by an statement to the
+ * list of disk components. Queries after call to this function will be
able to read all the changes made by
+ * the insert/upsert/delete statement.
+ *
+ * @throws HyracksDataException
+ */
+ void commit() throws HyracksDataException;
+
+ /**
+ * Aborts the ongoing statement by destroying all temporary disk
components generated by the statement and
+ * resetting the memory components
+ *
+ * @throws HyracksDataException
+ */
+ void abort() throws HyracksDataException;
+
+ ILSMMergePolicy getMergePolicy();
+
void deactivate(boolean flush) throws HyracksDataException;
@Override
@@ -106,6 +127,8 @@ public interface ILSMIndex extends IIndex {
void addDiskComponent(ILSMDiskComponent index) throws HyracksDataException;
+ void addBulkLoadedDiskComponent(ILSMDiskComponent c) throws
HyracksDataException;
+
void subsumeMergedComponents(ILSMDiskComponent newComponent,
List<ILSMComponent> mergedComponents)
throws HyracksDataException;
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMIndexInsertUpdateDeleteOperatorNodePushable.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMIndexInsertUpdateDeleteOperatorNodePushable.java
index c64e1e1667..2ec59d600d 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMIndexInsertUpdateDeleteOperatorNodePushable.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/dataflow/LSMIndexInsertUpdateDeleteOperatorNodePushable.java
@@ -19,11 +19,14 @@
package org.apache.hyracks.storage.am.lsm.common.dataflow;
import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.ITuplePartitionerFactory;
import org.apache.hyracks.api.dataflow.value.RecordDescriptor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.util.HyracksConstants;
import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
import org.apache.hyracks.dataflow.common.comm.util.FrameUtils;
import
org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
@@ -31,8 +34,11 @@ import
org.apache.hyracks.storage.am.common.api.ITupleFilterFactory;
import
org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import
org.apache.hyracks.storage.am.common.dataflow.IndexInsertUpdateDeleteOperatorNodePushable;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexFrameWriter;
+import org.apache.hyracks.storage.common.IIndex;
+import org.apache.hyracks.storage.common.IIndexAccessor;
public class LSMIndexInsertUpdateDeleteOperatorNodePushable extends
IndexInsertUpdateDeleteOperatorNodePushable
implements ILSMIndexFrameWriter {
@@ -134,4 +140,12 @@ public class
LSMIndexInsertUpdateDeleteOperatorNodePushable extends IndexInsertU
}
appender.write(writer, true);
}
+
+ protected void setAtomicOpContextIfAtomic(IIndex index, IIndexAccessor
accessor) {
+ if (((ILSMIndex) index).isAtomic()) {
+ Map<String, Object> indexAccessorOpContextParameters = new
HashMap<>();
+
indexAccessorOpContextParameters.put(HyracksConstants.ATOMIC_OP_CONTEXT, true);
+ ((ILSMIndexAccessor)
accessor).getOpContext().setParameters(indexAccessorOpContextParameters);
+ }
+ }
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
index 2745b44b3a..b10dd5c08f 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMIndex.java
@@ -39,6 +39,7 @@ import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.api.io.IIOManager;
import
org.apache.hyracks.api.replication.IReplicationJob.ReplicationExecutionType;
import org.apache.hyracks.api.replication.IReplicationJob.ReplicationOperation;
+import org.apache.hyracks.api.util.HyracksConstants;
import org.apache.hyracks.dataflow.common.data.accessors.ITupleReference;
import org.apache.hyracks.storage.am.common.impls.AbstractSearchPredicate;
import org.apache.hyracks.storage.am.common.impls.NoOpIndexAccessParameters;
@@ -112,6 +113,9 @@ public abstract class AbstractLSMIndex implements ILSMIndex
{
protected final ILSMDiskComponentFactory bulkLoadComponentFactory;
protected final ILSMPageWriteCallbackFactory pageWriteCallbackFactory;
private int numScheduledFlushes = 0;
+ private final boolean atomic;
+ private final List<ILSMDiskComponent> temporaryDiskComponents;
+ private final ILSMMergePolicy mergePolicy;
public AbstractLSMIndex(IIOManager ioManager, List<IVirtualBufferCache>
virtualBufferCaches,
IBufferCache diskBufferCache, ILSMIndexFileManager fileManager,
double bloomFilterFalsePositiveRate,
@@ -119,8 +123,8 @@ public abstract class AbstractLSMIndex implements ILSMIndex
{
ILSMIOOperationCallbackFactory ioOpCallbackFactory,
ILSMPageWriteCallbackFactory pageWriteCallbackFactory,
ILSMDiskComponentFactory componentFactory,
ILSMDiskComponentFactory bulkLoadComponentFactory,
ILSMComponentFilterFrameFactory filterFrameFactory,
LSMComponentFilterManager filterManager,
- int[] filterFields, boolean durable, IComponentFilterHelper
filterHelper, int[] treeFields, ITracer tracer)
- throws HyracksDataException {
+ int[] filterFields, boolean durable, IComponentFilterHelper
filterHelper, int[] treeFields, ITracer tracer,
+ boolean atomic) throws HyracksDataException {
this.ioManager = ioManager;
this.virtualBufferCaches = virtualBufferCaches;
this.diskBufferCache = diskBufferCache;
@@ -139,6 +143,10 @@ public abstract class AbstractLSMIndex implements
ILSMIndex {
this.inactiveMemoryComponents = new ArrayList<>();
this.durable = durable;
this.tracer = tracer;
+ this.atomic = atomic;
+ this.temporaryDiskComponents = new ArrayList<>();
+ this.mergePolicy = mergePolicy;
+
fileManager.initLastUsedSeq(ioOpCallback.getLastValidSequence());
lsmHarness = new LSMHarness(this, ioScheduler, mergePolicy, opTracker,
diskBufferCache.isReplicationEnabled(),
tracer);
@@ -152,6 +160,25 @@ public abstract class AbstractLSMIndex implements
ILSMIndex {
}
}
+ public AbstractLSMIndex(IIOManager ioManager, List<IVirtualBufferCache>
virtualBufferCaches,
+ IBufferCache diskBufferCache, ILSMIndexFileManager fileManager,
double bloomFilterFalsePositiveRate,
+ ILSMMergePolicy mergePolicy, ILSMOperationTracker opTracker,
ILSMIOOperationScheduler ioScheduler,
+ ILSMIOOperationCallbackFactory ioOpCallbackFactory,
ILSMPageWriteCallbackFactory pageWriteCallbackFactory,
+ ILSMDiskComponentFactory componentFactory,
ILSMDiskComponentFactory bulkLoadComponentFactory,
+ ILSMComponentFilterFrameFactory filterFrameFactory,
LSMComponentFilterManager filterManager,
+ int[] filterFields, boolean durable, IComponentFilterHelper
filterHelper, int[] treeFields, ITracer tracer)
+ throws HyracksDataException {
+ this(ioManager, virtualBufferCaches, diskBufferCache, fileManager,
bloomFilterFalsePositiveRate, mergePolicy,
+ opTracker, ioScheduler, ioOpCallbackFactory,
pageWriteCallbackFactory, componentFactory,
+ bulkLoadComponentFactory, filterFrameFactory, filterManager,
filterFields, durable, filterHelper,
+ treeFields, tracer, false);
+ }
+
+ @Override
+ public boolean isAtomic() {
+ return atomic;
+ }
+
@Override
public synchronized void create() throws HyracksDataException {
if (isActive) {
@@ -223,6 +250,9 @@ public abstract class AbstractLSMIndex implements ILSMIndex
{
for (ILSMDiskComponent c : diskComponents) {
c.deactivateAndPurge();
}
+ for (ILSMDiskComponent c : temporaryDiskComponents) {
+ c.deactivateAndPurge();
+ }
}
private void deallocateMemoryComponents() throws HyracksDataException {
@@ -247,6 +277,9 @@ public abstract class AbstractLSMIndex implements ILSMIndex
{
for (ILSMDiskComponent c : diskComponents) {
c.destroy();
}
+ for (ILSMDiskComponent c : temporaryDiskComponents) {
+ c.destroy();
+ }
}
@Override
@@ -265,6 +298,15 @@ public abstract class AbstractLSMIndex implements
ILSMIndex {
diskComponents.clear();
}
+ @Override
+ public void abort() throws HyracksDataException {
+ resetMemoryComponents();
+ for (ILSMDiskComponent c : temporaryDiskComponents) {
+ c.deactivateAndDestroy();
+ }
+ temporaryDiskComponents.clear();
+ }
+
private void resetMemoryComponents() throws HyracksDataException {
if (memoryComponentsAllocated && memoryComponents != null) {
for (ILSMMemoryComponent c : memoryComponents) {
@@ -299,7 +341,9 @@ public abstract class AbstractLSMIndex implements ILSMIndex
{
operationalComponents.addAll(diskComponents);
break;
case SEARCH:
- if (memoryComponentsAllocated) {
+ // search should include memory components for datasets with
atomic statements not enabled or search to
+ // check duplicate key while inserts/upserts on datasets with
atomic statements enabled
+ if (memoryComponentsAllocated && (!atomic ||
isAtomicOpContext(ctx))) {
addOperationalMemoryComponents(operationalComponents,
false);
}
if (filterManager != null) {
@@ -315,6 +359,9 @@ public abstract class AbstractLSMIndex implements ILSMIndex
{
} else {
operationalComponents.addAll(diskComponents);
}
+ if (isAtomicOpContext(ctx)) {
+ operationalComponents.addAll(temporaryDiskComponents);
+ }
break;
case REPLICATE:
@@ -328,6 +375,11 @@ public abstract class AbstractLSMIndex implements
ILSMIndex {
}
}
+ private boolean isAtomicOpContext(ILSMIndexOperationContext ctx) {
+ Map<String, Object> ctxParameters = ctx.getParameters();
+ return ctxParameters != null && (boolean)
ctxParameters.getOrDefault(HyracksConstants.ATOMIC_OP_CONTEXT, false);
+ }
+
@Override
public void scanDiskComponents(ILSMIndexOperationContext ctx, IIndexCursor
cursor) throws HyracksDataException {
throw
HyracksDataException.create(ErrorCode.DISK_COMPONENT_SCAN_NOT_ALLOWED_FOR_SECONDARY_INDEX);
@@ -553,12 +605,35 @@ public abstract class AbstractLSMIndex implements
ILSMIndex {
@Override
public void addDiskComponent(ILSMDiskComponent c) throws
HyracksDataException {
+ if (c != EmptyComponent.INSTANCE) {
+ if (atomic) {
+ temporaryDiskComponents.add(c);
+ LOGGER.debug("Adding new temporary disk component to index {};
current count: {}", c,
+ temporaryDiskComponents.size());
+ } else {
+ diskComponents.add(0, c);
+ }
+ }
+ validateComponentIds();
+ }
+
+ @Override
+ public void addBulkLoadedDiskComponent(ILSMDiskComponent c) throws
HyracksDataException {
if (c != EmptyComponent.INSTANCE) {
diskComponents.add(0, c);
}
validateComponentIds();
}
+ @Override
+ public void commit() throws HyracksDataException {
+ for (ILSMDiskComponent c : temporaryDiskComponents) {
+ diskComponents.add(0, c);
+ }
+ temporaryDiskComponents.clear();
+ validateComponentIds();
+ }
+
@Override
public void subsumeMergedComponents(ILSMDiskComponent newComponent,
List<ILSMComponent> mergedComponents)
throws HyracksDataException {
@@ -879,4 +954,9 @@ public abstract class AbstractLSMIndex implements ILSMIndex
{
return pageWriteCallbackFactory;
}
+ @Override
+ public ILSMMergePolicy getMergePolicy() {
+ return mergePolicy;
+ }
+
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
index e2c9fabb48..9fcce8b361 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/LSMHarness.java
@@ -190,7 +190,9 @@ public class LSMHarness implements ILSMHarness {
if (opType == LSMOperationType.FLUSH) {
opTracker.notifyAll();
if (!failedOperation) {
- waitForLaggingMerge();
+ if (!lsmIndex.isAtomic()) {
+ waitForLaggingMerge();
+ }
}
} else if (opType == LSMOperationType.MERGE) {
opTracker.notifyAll();
@@ -299,7 +301,9 @@ public class LSMHarness implements ILSMHarness {
componentsToBeReplicated.add(newComponent);
triggerReplication(componentsToBeReplicated, opType);
}
- mergePolicy.diskComponentAdded(lsmIndex, false);
+ if (!lsmIndex.isAtomic()) {
+ mergePolicy.diskComponentAdded(lsmIndex, false);
+ }
}
break;
case MERGE:
@@ -639,7 +643,7 @@ public class LSMHarness implements ILSMHarness {
throw HyracksDataException.create(ioOperation.getFailure());
}
synchronized (opTracker) {
- lsmIndex.addDiskComponent(c);
+ lsmIndex.addBulkLoadedDiskComponent(c);
if (replicationEnabled) {
componentsToBeReplicated.clear();
componentsToBeReplicated.add(c);
diff --git
a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtree.java
b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtree.java
index 2d0f079120..94d344742c 100644
---
a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtree.java
+++
b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtree.java
@@ -95,7 +95,7 @@ public class TestLsmBtree extends LSMBTree {
diskBufferCache, fileManager, componentFactory,
bulkLoadComponentFactory, filterHelper,
filterFrameFactory, filterManager,
bloomFilterFalsePositiveRate, fieldCount, cmpFactories, mergePolicy,
opTracker, ioScheduler, ioOperationCallbackFactory,
pageWriteCallbackFactory, needKeyDupCheck,
- hasBloomFilter, btreeFields, filterFields, durable,
updateAware, tracer);
+ hasBloomFilter, btreeFields, filterFields, durable,
updateAware, tracer, false);
addModifyCallback(AllowTestOpCallback.INSTANCE);
addSearchCallback(AllowTestOpCallback.INSTANCE);
addFlushCallback(AllowTestOpCallback.INSTANCE);
diff --git
a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResource.java
b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResource.java
index 039b244437..04fa11f1f0 100644
---
a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResource.java
+++
b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResource.java
@@ -63,14 +63,14 @@ public class TestLsmBtreeLocalResource extends
LSMBTreeLocalResource {
btreeFields, filterFields, opTrackerProvider,
ioOpCallbackFactory, pageWriteCallbackFactory,
metadataPageManagerFactory, vbcProvider, ioSchedulerProvider,
durable,
NoOpCompressorDecompressorFactory.INSTANCE, hasBloomFilter,
null, null,
- isSecondaryNoIncrementalMaintenance);
+ isSecondaryNoIncrementalMaintenance, false);
}
protected TestLsmBtreeLocalResource(IPersistedResourceRegistry registry,
JsonNode json, int[] bloomFilterKeyFields,
double bloomFilterFalsePositiveRate, boolean isPrimary, int[]
btreeFields, boolean hasBloomFilter,
boolean isSecondaryNoIncrementalMaintenance) throws
HyracksDataException {
super(registry, json, bloomFilterKeyFields,
bloomFilterFalsePositiveRate, isPrimary, btreeFields,
- NoOpCompressorDecompressorFactory.INSTANCE, hasBloomFilter,
isSecondaryNoIncrementalMaintenance);
+ NoOpCompressorDecompressorFactory.INSTANCE, hasBloomFilter,
isSecondaryNoIncrementalMaintenance, false);
}
@Override
diff --git
a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResourceFactory.java
b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResourceFactory.java
index 48adf91826..54b786ba91 100644
---
a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResourceFactory.java
+++
b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResourceFactory.java
@@ -53,7 +53,7 @@ public class TestLsmBtreeLocalResourceFactory extends
LSMBTreeLocalResourceFacto
vbcProvider, ioSchedulerProvider, mergePolicyFactory,
mergePolicyProperties, durable,
bloomFilterKeyFields, bloomFilterFalsePositiveRate, isPrimary,
btreeFields,
NoOpCompressorDecompressorFactory.INSTANCE, hasBloomFilter,
null, null,
- isSecondaryNoIncrementalMaintenance);
+ isSecondaryNoIncrementalMaintenance, false);
}
@Override