[NO ISSUE][STO] Fix memory leaks in storage
- user model changes: no
- storage format changes: no
- interface changes: yes
- Added javadocs to:
-- IBufferCache
-- IExtraPageBlockHelper
- Moved IBufferCache.setPageDiskId -> ICachedPage.setDiskPageId
- Renamed:
-- IBufferCache.flushDirtyPage -> IBufferCache.flush
-- IBufferCache.getNumPages -> IBufferCache.getPageBudget
- Removed:
-- IBufferCache.adviseWontNeed [not used]
-- IBufferCache.tryPin [not used]
details:
- Previously, when adding a kv pair to the metadata of a memory
component, we add a new Pair item to the ArrayList. After
this change, we only update it if it exists.
- VirtualBufferCache used to leak pages when reclaiming pages
of a file after deletion. This has also been fixed.
- New tests for VirtualBufferCache added:
- Checks for memory budget after end of testDisjointPins
- Concurrent Users pinning pages concurrently
- Test for large pages and ensuring allocated large
pages are accounted for through removal of cached
free pages.
Change-Id: I4ae9736c9b5fdba5795245bdf835c023e3f73b15
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2115
Tested-by: Jenkins <[email protected]>
Contrib: Jenkins <[email protected]>
Integration-Tests: Jenkins <[email protected]>
Reviewed-by: Murtadha Hubail <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/fbf3c0a9
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/fbf3c0a9
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/fbf3c0a9
Branch: refs/heads/master
Commit: fbf3c0a9718941122513ee2ae195e3a6f9df661e
Parents: c04046c
Author: Abdullah Alamoudi <[email protected]>
Authored: Thu Nov 2 19:48:09 2017 -0700
Committer: Michael Blow <[email protected]>
Committed: Fri Nov 3 08:23:18 2017 -0700
----------------------------------------------------------------------
.../active/SingleThreadEventProcessor.java | 84 -----
.../app/active/ActiveNotificationHandler.java | 2 +-
.../resources/asterix-build-configuration.xml | 2 +-
.../app/bootstrap/TestNodeController.java | 136 ++++++--
.../org/apache/asterix/test/active/Actor.java | 2 +-
.../test/dataflow/ComponentRollbackTest.java | 38 +--
.../asterix/test/dataflow/LogMarkerTest.java | 17 +-
.../asterix/test/logging/CheckpointingTest.java | 17 +-
.../asterix/test/storage/DiskIsFullTest.java | 27 +-
.../asterix/common/exceptions/ErrorCode.java | 1 -
.../main/resources/asx_errormsg/en.properties | 1 -
.../hyracks/api/exceptions/ErrorCode.java | 4 +
.../api/util/SingleThreadEventProcessor.java | 83 +++++
.../src/main/resources/errormsg/en.properties | 4 +
.../hyracks/data/std/api/AbstractPointable.java | 13 -
.../apache/hyracks/data/std/util/DataUtils.java | 59 ++++
.../hyracks/storage/am/btree/impls/BTree.java | 10 +-
.../AppendOnlyLinkedMetadataPageManager.java | 8 +-
.../freepage/LinkedMetaDataPageManager.java | 6 +-
.../am/common/impls/AbstractTreeIndex.java | 5 +
.../common/impls/AbstractLSMDiskComponent.java | 5 +
.../impls/AbstractLSMMemoryComponent.java | 2 +-
.../lsm/common/impls/DiskComponentMetadata.java | 2 +-
.../common/impls/MemoryComponentMetadata.java | 9 +-
.../impls/MultitenantVirtualBufferCache.java | 29 +-
.../lsm/common/impls/NoMergePolicyFactory.java | 4 +-
.../am/lsm/common/impls/VirtualBufferCache.java | 298 +++++++++--------
.../inmemory/InMemoryInvertedIndex.java | 2 +-
.../hyracks/storage/am/rtree/impls/RTree.java | 7 +-
.../storage/common/buffercache/BufferCache.java | 63 +---
.../storage/common/buffercache/CachedPage.java | 14 +-
.../common/buffercache/DebugBufferCache.java | 24 +-
.../common/buffercache/FIFOLocalWriter.java | 12 +-
.../common/buffercache/IBufferCache.java | 153 ++++++++-
.../storage/common/buffercache/ICachedPage.java | 9 +
.../buffercache/IExtraPageBlockHelper.java | 19 ++
.../common/buffercache/IFIFOPageQueue.java | 3 +-
.../storage/common/buffercache/VirtualPage.java | 10 +
.../lsm/common/test/VirtualBufferCacheTest.java | 320 ++++++++++++++++---
.../hyracks/storage/common/BufferCacheTest.java | 32 +-
40 files changed, 989 insertions(+), 547 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fbf3c0a9/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/SingleThreadEventProcessor.java
----------------------------------------------------------------------
diff --git
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/SingleThreadEventProcessor.java
b/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/SingleThreadEventProcessor.java
deleted file mode 100644
index de6682d..0000000
---
a/asterixdb/asterix-active/src/main/java/org/apache/asterix/active/SingleThreadEventProcessor.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.active;
-
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.exceptions.ErrorCode;
-import org.apache.asterix.common.exceptions.RuntimeDataException;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-
-public abstract class SingleThreadEventProcessor<T> implements Runnable {
-
- private static final Logger LOGGER =
Logger.getLogger(SingleThreadEventProcessor.class.getName());
- private final String name;
- private final LinkedBlockingQueue<T> eventInbox;
- private volatile Thread executorThread;
- private volatile boolean stopped = false;
-
- public SingleThreadEventProcessor(String threadName) {
- this.name = threadName;
- eventInbox = new LinkedBlockingQueue<>();
- executorThread = new Thread(this, threadName);
- executorThread.start();
- }
-
- @Override
- public final void run() {
- LOGGER.log(Level.INFO, "Started " + Thread.currentThread().getName());
- while (!stopped) {
- try {
- T event = eventInbox.take();
- handle(event);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- } catch (Exception e) {
- LOGGER.log(Level.SEVERE, "Error handling an event", e);
- }
- }
- LOGGER.log(Level.WARNING, "Stopped " +
Thread.currentThread().getName());
- }
-
- protected abstract void handle(T event) throws Exception; //NOSONAR
-
- public void add(T event) {
- if (!eventInbox.add(event)) {
- throw new IllegalStateException();
- }
- }
-
- public void stop() throws HyracksDataException, InterruptedException {
- stopped = true;
- executorThread.interrupt();
- executorThread.join(1000);
- int attempt = 0;
- while (executorThread.isAlive()) {
- attempt++;
- LOGGER.log(Level.WARNING,
- "Failed to stop event processor after " + attempt + "
attempts. Interrupted exception swallowed?");
- if (attempt == 10) {
- throw new
RuntimeDataException(ErrorCode.FAILED_TO_SHUTDOWN_EVENT_PROCESSOR, name);
- }
- executorThread.interrupt();
- executorThread.join(1000);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fbf3c0a9/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
----------------------------------------------------------------------
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
index cbcc44f..d13a15d 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/active/ActiveNotificationHandler.java
@@ -31,7 +31,6 @@ import org.apache.asterix.active.ActivityState;
import org.apache.asterix.active.EntityId;
import org.apache.asterix.active.IActiveEntityEventsListener;
import org.apache.asterix.active.IActiveNotificationHandler;
-import org.apache.asterix.active.SingleThreadEventProcessor;
import org.apache.asterix.active.message.ActivePartitionMessage;
import org.apache.asterix.common.api.IMetadataLockManager;
import org.apache.asterix.common.exceptions.AsterixException;
@@ -48,6 +47,7 @@ import org.apache.hyracks.api.job.IJobLifecycleListener;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.JobSpecification;
import org.apache.hyracks.api.job.JobStatus;
+import org.apache.hyracks.api.util.SingleThreadEventProcessor;
public class ActiveNotificationHandler extends
SingleThreadEventProcessor<ActiveEvent>
implements IActiveNotificationHandler, IJobLifecycleListener {
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fbf3c0a9/asterixdb/asterix-app/src/main/resources/asterix-build-configuration.xml
----------------------------------------------------------------------
diff --git
a/asterixdb/asterix-app/src/main/resources/asterix-build-configuration.xml
b/asterixdb/asterix-app/src/main/resources/asterix-build-configuration.xml
index ce7eb3d..7eba9eb 100644
--- a/asterixdb/asterix-app/src/main/resources/asterix-build-configuration.xml
+++ b/asterixdb/asterix-app/src/main/resources/asterix-build-configuration.xml
@@ -77,7 +77,7 @@
</property>
<property>
<name>storage.memorycomponent.numpages</name>
- <value>8</value>
+ <value>16</value>
<description>The number of pages to allocate for a memory component.
This budget is shared by all the memory components of the primary
index and all its secondary indexes across all I/O devices on a node.
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fbf3c0a9/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
----------------------------------------------------------------------
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
index 104f80b..b155b51 100644
---
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/app/bootstrap/TestNodeController.java
@@ -19,6 +19,7 @@
package org.apache.asterix.app.bootstrap;
import java.io.File;
+import java.rmi.RemoteException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -36,20 +37,27 @@ import
org.apache.asterix.common.context.IStorageComponentProvider;
import org.apache.asterix.common.context.TransactionSubsystemProvider;
import org.apache.asterix.common.dataflow.ICcApplicationContext;
import org.apache.asterix.common.dataflow.LSMInsertDeleteOperatorNodePushable;
+import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.transactions.IRecoveryManager.ResourceType;
import org.apache.asterix.common.transactions.ITransactionManager;
+import org.apache.asterix.dataflow.data.nontagged.MissingWriterFactory;
import org.apache.asterix.file.StorageComponentProvider;
import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
import org.apache.asterix.formats.nontagged.TypeTraitProvider;
+import org.apache.asterix.metadata.MetadataManager;
+import org.apache.asterix.metadata.MetadataTransactionContext;
import org.apache.asterix.metadata.declared.MetadataProvider;
import org.apache.asterix.metadata.entities.Dataset;
import org.apache.asterix.metadata.entities.Dataverse;
import org.apache.asterix.metadata.entities.Index;
+import org.apache.asterix.metadata.utils.DatasetUtil;
import org.apache.asterix.metadata.utils.MetadataUtil;
import org.apache.asterix.metadata.utils.SplitsAndConstraintsUtil;
import org.apache.asterix.om.types.ARecordType;
import org.apache.asterix.om.types.IAType;
+import org.apache.asterix.runtime.formats.FormatUtils;
import org.apache.asterix.runtime.formats.NonTaggedDataFormat;
+import
org.apache.asterix.runtime.operators.LSMPrimaryUpsertOperatorNodePushable;
import org.apache.asterix.runtime.utils.CcApplicationContext;
import org.apache.asterix.test.runtime.ExecutionTestUtil;
import
org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback.Operation;
@@ -81,11 +89,13 @@ import
org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorDescripto
import
org.apache.hyracks.storage.am.btree.dataflow.BTreeSearchOperatorNodePushable;
import org.apache.hyracks.storage.am.common.api.IIndexBuilder;
import
org.apache.hyracks.storage.am.common.api.IModificationOperationCallbackFactory;
+import
org.apache.hyracks.storage.am.common.api.ISearchOperationCallbackFactory;
import org.apache.hyracks.storage.am.common.build.IndexBuilderFactory;
import
org.apache.hyracks.storage.am.common.dataflow.IIndexDataflowHelperFactory;
import
org.apache.hyracks.storage.am.common.dataflow.IndexDataflowHelperFactory;
import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallbackFactory;
import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
+import
org.apache.hyracks.storage.am.lsm.common.api.IFrameOperationCallbackFactory;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
import org.apache.hyracks.storage.am.lsm.common.impls.NoMergePolicyFactory;
import org.apache.hyracks.storage.common.IResourceFactory;
@@ -113,9 +123,7 @@ public class TestNodeController {
public static final double BLOOM_FILTER_FALSE_POSITIVE_RATE = 0.01;
public static final TransactionSubsystemProvider TXN_SUBSYSTEM_PROVIDER =
TransactionSubsystemProvider.INSTANCE;
// Mutables
- private JobId jobId;
private long jobCounter = 0L;
- private IHyracksJobletContext jobletCtx;
private final String testConfigFileName;
private final boolean runHDFS;
@@ -137,9 +145,6 @@ public class TestNodeController {
th.printStackTrace();
throw th;
}
- jobletCtx = Mockito.mock(IHyracksJobletContext.class);
-
Mockito.when(jobletCtx.getServiceContext()).thenReturn(ExecutionTestUtil.integrationUtil.ncs[0].getContext());
- Mockito.when(jobletCtx.getJobId()).thenReturn(jobId);
}
public void deInit() throws Exception {
@@ -147,20 +152,24 @@ public class TestNodeController {
ExecutionTestUtil.tearDown(cleanupOnStop);
}
- public org.apache.asterix.common.transactions.JobId getTxnJobId() {
- return new org.apache.asterix.common.transactions.JobId((int)
jobId.getId());
+ public org.apache.asterix.common.transactions.JobId
getTxnJobId(IHyracksTaskContext ctx) {
+ return new org.apache.asterix.common.transactions.JobId((int)
ctx.getJobletContext().getJobId().getId());
}
public Pair<LSMInsertDeleteOperatorNodePushable, CommitRuntime>
getInsertPipeline(IHyracksTaskContext ctx,
- Dataset dataset, IAType[] primaryKeyTypes, ARecordType recordType,
ARecordType metaType,
- ILSMMergePolicyFactory mergePolicyFactory, Map<String, String>
mergePolicyProperties, int[] filterFields,
+ Dataset dataset, IAType[] primaryKeyTypes, ARecordType recordType,
ARecordType metaType, int[] filterFields,
int[] primaryKeyIndexes, List<Integer> primaryKeyIndicators,
- StorageComponentProvider storageComponentProvider) throws
AlgebricksException, HyracksDataException {
+ StorageComponentProvider storageComponentProvider)
+ throws AlgebricksException, HyracksDataException, RemoteException,
ACIDException {
+ MetadataTransactionContext mdTxnCtx =
MetadataManager.INSTANCE.beginTransaction();
+
org.apache.hyracks.algebricks.common.utils.Pair<ILSMMergePolicyFactory,
Map<String, String>> mergePolicy =
+ DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset,
primaryKeyTypes, recordType, metaType,
- mergePolicyFactory, mergePolicyProperties, filterFields,
primaryKeyIndexes, primaryKeyIndicators);
+ mergePolicy.first, mergePolicy.second, filterFields,
primaryKeyIndexes, primaryKeyIndicators);
IndexOperation op = IndexOperation.INSERT;
IModificationOperationCallbackFactory modOpCallbackFactory =
- new
PrimaryIndexModificationOperationCallbackFactory(getTxnJobId(),
dataset.getDatasetId(),
+ new
PrimaryIndexModificationOperationCallbackFactory(getTxnJobId(ctx),
dataset.getDatasetId(),
primaryIndexInfo.primaryKeyIndexes,
TXN_SUBSYSTEM_PROVIDER, Operation.get(op),
ResourceType.LSM_BTREE);
IRecordDescriptorProvider recordDescProvider =
primaryIndexInfo.getInsertRecordDescriptorProvider();
@@ -170,7 +179,7 @@ public class TestNodeController {
primaryIndexInfo.primaryIndexInsertFieldsPermutations,
recordDescProvider.getInputRecordDescriptor(new ActivityId(new
OperatorDescriptorId(0), 0), 0), op,
true, indexHelperFactory, modOpCallbackFactory, null);
- CommitRuntime commitOp = new CommitRuntime(ctx, getTxnJobId(),
dataset.getDatasetId(),
+ CommitRuntime commitOp = new CommitRuntime(ctx, getTxnJobId(ctx),
dataset.getDatasetId(),
primaryIndexInfo.primaryKeyIndexes, false, true, PARTITION,
true);
insertOp.setOutputFrameWriter(0, commitOp, primaryIndexInfo.rDesc);
commitOp.setInputRecordDescriptor(0, primaryIndexInfo.rDesc);
@@ -204,8 +213,7 @@ public class TestNodeController {
}
public JobId newJobId() {
- jobId = new JobId(jobCounter++);
- return jobId;
+ return new JobId(jobCounter++);
}
public IResourceFactory getPrimaryResourceFactory(IHyracksTaskContext ctx,
PrimaryIndexInfo primaryIndexInfo,
@@ -225,18 +233,22 @@ public class TestNodeController {
}
public PrimaryIndexInfo createPrimaryIndex(Dataset dataset, IAType[]
primaryKeyTypes, ARecordType recordType,
- ARecordType metaType, ILSMMergePolicyFactory mergePolicyFactory,
Map<String, String> mergePolicyProperties,
- int[] filterFields, IStorageComponentProvider
storageComponentProvider, int[] primaryKeyIndexes,
- List<Integer> primaryKeyIndicators) throws AlgebricksException,
HyracksDataException {
+ ARecordType metaType, int[] filterFields,
IStorageComponentProvider storageComponentProvider,
+ int[] primaryKeyIndexes, List<Integer> primaryKeyIndicators)
+ throws AlgebricksException, HyracksDataException, RemoteException,
ACIDException {
+ MetadataTransactionContext mdTxnCtx =
MetadataManager.INSTANCE.beginTransaction();
+
org.apache.hyracks.algebricks.common.utils.Pair<ILSMMergePolicyFactory,
Map<String, String>> mergePolicy =
+ DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset,
primaryKeyTypes, recordType, metaType,
- mergePolicyFactory, mergePolicyProperties, filterFields,
primaryKeyIndexes, primaryKeyIndicators);
+ mergePolicy.first, mergePolicy.second, filterFields,
primaryKeyIndexes, primaryKeyIndicators);
Dataverse dataverse = new Dataverse(dataset.getDataverseName(),
NonTaggedDataFormat.class.getName(),
MetadataUtil.PENDING_NO_OP);
MetadataProvider mdProvider = new MetadataProvider(
(ICcApplicationContext)
ExecutionTestUtil.integrationUtil.cc.getApplicationContext(), dataverse);
try {
IResourceFactory resourceFactory =
dataset.getResourceFactory(mdProvider, primaryIndexInfo.index,
- recordType, metaType, mergePolicyFactory,
mergePolicyProperties);
+ recordType, metaType, mergePolicy.first,
mergePolicy.second);
IndexBuilderFactory indexBuilderFactory =
new
IndexBuilderFactory(storageComponentProvider.getStorageManager(),
primaryIndexInfo.getFileSplitProvider(),
resourceFactory, !dataset.isTemp());
@@ -283,6 +295,10 @@ public class TestNodeController {
if (withMessaging) {
TaskUtil.put(HyracksConstants.KEY_MESSAGE, new VSizeFrame(ctx),
ctx);
}
+ JobId jobId = newJobId();
+ IHyracksJobletContext jobletCtx =
Mockito.mock(IHyracksJobletContext.class);
+
Mockito.when(jobletCtx.getServiceContext()).thenReturn(ExecutionTestUtil.integrationUtil.ncs[0].getContext());
+ Mockito.when(jobletCtx.getJobId()).thenReturn(jobId);
ctx = Mockito.spy(ctx);
Mockito.when(ctx.getJobletContext()).thenReturn(jobletCtx);
Mockito.when(ctx.getIoManager()).thenReturn(ExecutionTestUtil.integrationUtil.ncs[0].getIoManager());
@@ -410,4 +426,84 @@ public class TestNodeController {
(CcApplicationContext)
ExecutionTestUtil.integrationUtil.cc.getApplicationContext();
return appCtx.getStorageManager();
}
+
+ public Pair<LSMPrimaryUpsertOperatorNodePushable, CommitRuntime>
getUpsertPipeline(IHyracksTaskContext ctx,
+ Dataset dataset, IAType[] keyTypes, ARecordType recordType,
ARecordType metaType, int[] filterFields,
+ int[] keyIndexes, List<Integer> keyIndicators,
StorageComponentProvider storageComponentProvider,
+ IFrameOperationCallbackFactory frameOpCallbackFactory, boolean
hasSecondaries) throws Exception {
+ MetadataTransactionContext mdTxnCtx =
MetadataManager.INSTANCE.beginTransaction();
+
org.apache.hyracks.algebricks.common.utils.Pair<ILSMMergePolicyFactory,
Map<String, String>> mergePolicy =
+ DatasetUtil.getMergePolicyFactory(dataset, mdTxnCtx);
+ MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
+ PrimaryIndexInfo primaryIndexInfo = new PrimaryIndexInfo(dataset,
keyTypes, recordType, metaType,
+ mergePolicy.first, mergePolicy.second, filterFields,
keyIndexes, keyIndicators);
+ IModificationOperationCallbackFactory modificationCallbackFactory =
dataset.getModificationCallbackFactory(
+ storageComponentProvider, primaryIndexInfo.index,
getTxnJobId(ctx), IndexOperation.UPSERT, keyIndexes);
+ ISearchOperationCallbackFactory searchCallbackFactory =
dataset.getSearchCallbackFactory(
+ storageComponentProvider, primaryIndexInfo.index,
getTxnJobId(ctx), IndexOperation.UPSERT, keyIndexes);
+ IRecordDescriptorProvider recordDescProvider =
primaryIndexInfo.getInsertRecordDescriptorProvider();
+ IIndexDataflowHelperFactory indexHelperFactory = new
IndexDataflowHelperFactory(
+ storageComponentProvider.getStorageManager(),
primaryIndexInfo.getFileSplitProvider());
+ LSMPrimaryUpsertOperatorNodePushable insertOp = new
LSMPrimaryUpsertOperatorNodePushable(ctx, PARTITION,
+ indexHelperFactory,
primaryIndexInfo.primaryIndexInsertFieldsPermutations,
+ recordDescProvider.getInputRecordDescriptor(new ActivityId(new
OperatorDescriptorId(0), 0), 0),
+ modificationCallbackFactory, searchCallbackFactory,
keyIndexes.length, recordType, -1,
+ frameOpCallbackFactory == null ?
dataset.getFrameOpCallbackFactory() : frameOpCallbackFactory,
+ MissingWriterFactory.INSTANCE, hasSecondaries);
+ RecordDescriptor upsertOutRecDesc =
getUpsertOutRecDesc(primaryIndexInfo.rDesc, dataset,
+ filterFields == null ? 0 : filterFields.length, recordType,
metaType);
+ // fix pk fields
+ int diff = upsertOutRecDesc.getFieldCount() -
primaryIndexInfo.rDesc.getFieldCount();
+ int[] pkFieldsInCommitOp = new int[dataset.getPrimaryKeys().size()];
+ for (int i = 0; i < pkFieldsInCommitOp.length; i++) {
+ pkFieldsInCommitOp[i] = diff + i;
+ }
+ CommitRuntime commitOp = new CommitRuntime(ctx, getTxnJobId(ctx),
dataset.getDatasetId(), pkFieldsInCommitOp,
+ false, true, PARTITION, true);
+ insertOp.setOutputFrameWriter(0, commitOp, upsertOutRecDesc);
+ commitOp.setInputRecordDescriptor(0, upsertOutRecDesc);
+ return Pair.of(insertOp, commitOp);
+ }
+
+ private RecordDescriptor getUpsertOutRecDesc(RecordDescriptor
inputRecordDesc, Dataset dataset, int numFilterFields,
+ ARecordType itemType, ARecordType metaItemType) throws Exception {
+ ITypeTraits[] outputTypeTraits =
+ new ITypeTraits[inputRecordDesc.getFieldCount() +
(dataset.hasMetaPart() ? 2 : 1) + numFilterFields];
+ ISerializerDeserializer<?>[] outputSerDes = new
ISerializerDeserializer[inputRecordDesc.getFieldCount()
+ + (dataset.hasMetaPart() ? 2 : 1) + numFilterFields];
+
+ // add the previous record first
+ int f = 0;
+ outputSerDes[f] =
FormatUtils.getDefaultFormat().getSerdeProvider().getSerializerDeserializer(itemType);
+ f++;
+ // add the previous meta second
+ if (dataset.hasMetaPart()) {
+ outputSerDes[f] =
FormatUtils.getDefaultFormat().getSerdeProvider().getSerializerDeserializer(metaItemType);
+ outputTypeTraits[f] =
FormatUtils.getDefaultFormat().getTypeTraitProvider().getTypeTrait(metaItemType);
+ f++;
+ }
+ // add the previous filter third
+ int fieldIdx = -1;
+ if (numFilterFields > 0) {
+ String filterField = DatasetUtil.getFilterField(dataset).get(0);
+ String[] fieldNames = itemType.getFieldNames();
+ int i = 0;
+ for (; i < fieldNames.length; i++) {
+ if (fieldNames[i].equals(filterField)) {
+ break;
+ }
+ }
+ fieldIdx = i;
+ outputTypeTraits[f] =
FormatUtils.getDefaultFormat().getTypeTraitProvider()
+ .getTypeTrait(itemType.getFieldTypes()[fieldIdx]);
+ outputSerDes[f] = FormatUtils.getDefaultFormat().getSerdeProvider()
+
.getSerializerDeserializer(itemType.getFieldTypes()[fieldIdx]);
+ f++;
+ }
+ for (int j = 0; j < inputRecordDesc.getFieldCount(); j++) {
+ outputTypeTraits[j + f] = inputRecordDesc.getTypeTraits()[j];
+ outputSerDes[j + f] = inputRecordDesc.getFields()[j];
+ }
+ return new RecordDescriptor(outputSerDes, outputTypeTraits);
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fbf3c0a9/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Actor.java
----------------------------------------------------------------------
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Actor.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Actor.java
index 8d21b55..c50a4a2 100644
---
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Actor.java
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/active/Actor.java
@@ -18,8 +18,8 @@
*/
package org.apache.asterix.test.active;
-import org.apache.asterix.active.SingleThreadEventProcessor;
import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.hyracks.api.util.SingleThreadEventProcessor;
public class Actor extends SingleThreadEventProcessor<Action> {
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fbf3c0a9/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
----------------------------------------------------------------------
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
index 76bec8c..36cb4bb 100644
---
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/ComponentRollbackTest.java
@@ -132,12 +132,12 @@ public class ComponentRollbackTest {
public void createIndex() throws Exception {
List<List<String>> partitioningKeys = new ArrayList<>();
partitioningKeys.add(Collections.singletonList("key"));
- dataset = new TestDataset(DATAVERSE_NAME, DATASET_NAME,
DATAVERSE_NAME, DATA_TYPE_NAME,
- NODE_GROUP_NAME, null, null, new InternalDatasetDetails(null,
PartitioningStrategy.HASH,
+ dataset = new TestDataset(DATAVERSE_NAME, DATASET_NAME,
DATAVERSE_NAME, DATA_TYPE_NAME, NODE_GROUP_NAME,
+ NoMergePolicyFactory.NAME, null, new
InternalDatasetDetails(null, PartitioningStrategy.HASH,
partitioningKeys, null, null, null, false, null,
false),
null, DatasetType.INTERNAL, DATASET_ID, 0);
- PrimaryIndexInfo primaryIndexInfo = nc.createPrimaryIndex(dataset,
KEY_TYPES, RECORD_TYPE, META_TYPE,
- new NoMergePolicyFactory(), null, null, storageManager,
KEY_INDEXES, KEY_INDICATORS_LIST);
+ PrimaryIndexInfo primaryIndexInfo = nc.createPrimaryIndex(dataset,
KEY_TYPES, RECORD_TYPE, META_TYPE, null,
+ storageManager, KEY_INDEXES, KEY_INDICATORS_LIST);
IndexDataflowHelperFactory iHelperFactory =
new IndexDataflowHelperFactory(nc.getStorageManager(),
primaryIndexInfo.getFileSplitProvider());
ctx = nc.createTestContext(false);
@@ -146,9 +146,9 @@ public class ComponentRollbackTest {
lsmBtree = (TestLsmBtree) indexDataflowHelper.getIndexInstance();
indexDataflowHelper.close();
nc.newJobId();
- txnCtx =
nc.getTransactionManager().getTransactionContext(nc.getTxnJobId(), true);
- insertOp = nc.getInsertPipeline(ctx, dataset, KEY_TYPES, RECORD_TYPE,
META_TYPE, new NoMergePolicyFactory(),
- null, null, KEY_INDEXES, KEY_INDICATORS_LIST,
storageManager).getLeft();
+ txnCtx =
nc.getTransactionManager().getTransactionContext(nc.getTxnJobId(ctx), true);
+ insertOp = nc.getInsertPipeline(ctx, dataset, KEY_TYPES, RECORD_TYPE,
META_TYPE, null, KEY_INDEXES,
+ KEY_INDICATORS_LIST, storageManager).getLeft();
}
@After
@@ -174,7 +174,7 @@ public class ComponentRollbackTest {
VSizeFrame frame = new VSizeFrame(ctx);
FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
- // flush every 1000 records
+ // flush every RECORDS_PER_COMPONENT records
if (j % RECORDS_PER_COMPONENT == 0 && j + 1 !=
TOTAL_NUM_OF_RECORDS) {
if (tupleAppender.getTupleCount() > 0) {
tupleAppender.write(insertOp, true);
@@ -224,7 +224,7 @@ public class ComponentRollbackTest {
VSizeFrame frame = new VSizeFrame(ctx);
FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
- // flush every 1000 records
+ // flush every RECORDS_PER_COMPONENT records
if (j % RECORDS_PER_COMPONENT == 0 && j + 1 !=
TOTAL_NUM_OF_RECORDS) {
if (tupleAppender.getTupleCount() > 0) {
tupleAppender.write(insertOp, true);
@@ -254,9 +254,9 @@ public class ComponentRollbackTest {
// insert again
nc.newJobId();
- txnCtx =
nc.getTransactionManager().getTransactionContext(nc.getTxnJobId(), true);
- insertOp = nc.getInsertPipeline(ctx, dataset, KEY_TYPES,
RECORD_TYPE, META_TYPE, new NoMergePolicyFactory(),
- null, null, KEY_INDEXES, KEY_INDICATORS_LIST,
storageManager).getLeft();
+ txnCtx =
nc.getTransactionManager().getTransactionContext(nc.getTxnJobId(ctx), true);
+ insertOp = nc.getInsertPipeline(ctx, dataset, KEY_TYPES,
RECORD_TYPE, META_TYPE, null, KEY_INDEXES,
+ KEY_INDICATORS_LIST, storageManager).getLeft();
insertOp.open();
for (int j = 0; j < RECORDS_PER_COMPONENT; j++) {
ITupleReference tuple = tupleGenerator.next();
@@ -291,7 +291,7 @@ public class ComponentRollbackTest {
VSizeFrame frame = new VSizeFrame(ctx);
FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
- // flush every 1000 records
+ // flush every RECORDS_PER_COMPONENT records
if (j % RECORDS_PER_COMPONENT == 0 && j + 1 !=
TOTAL_NUM_OF_RECORDS) {
if (tupleAppender.getTupleCount() > 0) {
tupleAppender.write(insertOp, true);
@@ -360,7 +360,7 @@ public class ComponentRollbackTest {
VSizeFrame frame = new VSizeFrame(ctx);
FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
- // flush every 1000 records
+ // flush every RECORDS_PER_COMPONENT records
if (j % RECORDS_PER_COMPONENT == 0 && j + 1 !=
TOTAL_NUM_OF_RECORDS) {
if (tupleAppender.getTupleCount() > 0) {
tupleAppender.write(insertOp, true);
@@ -416,7 +416,7 @@ public class ComponentRollbackTest {
VSizeFrame frame = new VSizeFrame(ctx);
FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
- // flush every 1000 records
+ // flush every RECORDS_PER_COMPONENT records
if (j % RECORDS_PER_COMPONENT == 0 && j + 1 !=
TOTAL_NUM_OF_RECORDS) {
if (tupleAppender.getTupleCount() > 0) {
tupleAppender.write(insertOp, true);
@@ -481,7 +481,7 @@ public class ComponentRollbackTest {
VSizeFrame frame = new VSizeFrame(ctx);
FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
- // flush every 1000 records
+ // flush every RECORDS_PER_COMPONENT records
if (j % RECORDS_PER_COMPONENT == 0 && j + 1 !=
TOTAL_NUM_OF_RECORDS) {
if (tupleAppender.getTupleCount() > 0) {
tupleAppender.write(insertOp, true);
@@ -541,7 +541,7 @@ public class ComponentRollbackTest {
VSizeFrame frame = new VSizeFrame(ctx);
FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
- // flush every 1000 records
+ // flush every RECORDS_PER_COMPONENT records
if (j % RECORDS_PER_COMPONENT == 0 && j + 1 !=
TOTAL_NUM_OF_RECORDS) {
if (tupleAppender.getTupleCount() > 0) {
tupleAppender.write(insertOp, true);
@@ -603,7 +603,7 @@ public class ComponentRollbackTest {
VSizeFrame frame = new VSizeFrame(ctx);
FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
- // flush every 1000 records
+ // flush every RECORDS_PER_COMPONENT records
if (j % RECORDS_PER_COMPONENT == 0 && j + 1 !=
TOTAL_NUM_OF_RECORDS) {
if (tupleAppender.getTupleCount() > 0) {
tupleAppender.write(insertOp, true);
@@ -675,7 +675,7 @@ public class ComponentRollbackTest {
VSizeFrame frame = new VSizeFrame(ctx);
FrameTupleAppender tupleAppender = new FrameTupleAppender(frame);
for (int j = 0; j < TOTAL_NUM_OF_RECORDS; j++) {
- // flush every 1000 records
+ // flush every RECORDS_PER_COMPONENT records
if (j % RECORDS_PER_COMPONENT == 0 && j + 1 !=
TOTAL_NUM_OF_RECORDS) {
if (tupleAppender.getTupleCount() > 0) {
tupleAppender.write(insertOp, true);
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fbf3c0a9/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
----------------------------------------------------------------------
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
index a0e3aa9..82eb16a 100644
---
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/LogMarkerTest.java
@@ -110,20 +110,19 @@ public class LogMarkerTest {
StorageComponentProvider storageManager = new
StorageComponentProvider();
List<List<String>> partitioningKeys = new ArrayList<>();
partitioningKeys.add(Collections.singletonList("key"));
- Dataset dataset = new Dataset(DATAVERSE_NAME, DATASET_NAME,
DATAVERSE_NAME, DATA_TYPE_NAME,
- NODE_GROUP_NAME, null, null, new
InternalDatasetDetails(null, PartitioningStrategy.HASH,
+ Dataset dataset = new Dataset(DATAVERSE_NAME, DATASET_NAME,
DATAVERSE_NAME, DATA_TYPE_NAME, NODE_GROUP_NAME,
+ NoMergePolicyFactory.NAME, null, new
InternalDatasetDetails(null, PartitioningStrategy.HASH,
partitioningKeys, null, null, null, false, null,
false),
null, DatasetType.INTERNAL, DATASET_ID, 0);
try {
- PrimaryIndexInfo indexInfo = nc.createPrimaryIndex(dataset,
KEY_TYPES, RECORD_TYPE, META_TYPE,
- new NoMergePolicyFactory(), null, null,
storageManager, KEY_INDEXES, KEY_INDICATORS_LIST);
+ PrimaryIndexInfo indexInfo = nc.createPrimaryIndex(dataset,
KEY_TYPES, RECORD_TYPE, META_TYPE, null,
+ storageManager, KEY_INDEXES, KEY_INDICATORS_LIST);
IHyracksTaskContext ctx = nc.createTestContext(true);
nc.newJobId();
- ITransactionContext txnCtx =
nc.getTransactionManager().getTransactionContext(nc.getTxnJobId(), true);
- LSMInsertDeleteOperatorNodePushable insertOp = nc
- .getInsertPipeline(ctx, dataset, KEY_TYPES,
RECORD_TYPE, META_TYPE, new NoMergePolicyFactory(),
- null, null, KEY_INDEXES, KEY_INDICATORS_LIST,
storageManager)
- .getLeft();
+ ITransactionContext txnCtx =
+
nc.getTransactionManager().getTransactionContext(nc.getTxnJobId(ctx), true);
+ LSMInsertDeleteOperatorNodePushable insertOp =
nc.getInsertPipeline(ctx, dataset, KEY_TYPES,
+ RECORD_TYPE, META_TYPE, null, KEY_INDEXES,
KEY_INDICATORS_LIST, storageManager).getLeft();
insertOp.open();
TupleGenerator tupleGenerator = new
TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATORS,
RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS,
META_GEN_FUNCTION, UNIQUE_META_FIELDS);
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fbf3c0a9/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
----------------------------------------------------------------------
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
index 41b3d38..5384c92 100644
---
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java
@@ -119,21 +119,20 @@ public class CheckpointingTest {
nc.init();
List<List<String>> partitioningKeys = new ArrayList<>();
partitioningKeys.add(Collections.singletonList("key"));
- Dataset dataset = new Dataset(DATAVERSE_NAME, DATASET_NAME,
DATAVERSE_NAME, DATA_TYPE_NAME,
- NODE_GROUP_NAME, null, null, new
InternalDatasetDetails(null, PartitioningStrategy.HASH,
+ Dataset dataset = new Dataset(DATAVERSE_NAME, DATASET_NAME,
DATAVERSE_NAME, DATA_TYPE_NAME, NODE_GROUP_NAME,
+ NoMergePolicyFactory.NAME, null, new
InternalDatasetDetails(null, PartitioningStrategy.HASH,
partitioningKeys, null, null, null, false, null,
false),
null, DatasetType.INTERNAL, DATASET_ID, 0);
try {
- nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE,
META_TYPE, new NoMergePolicyFactory(), null,
- null, storageManager, KEY_INDEXES, KEY_INDICATOR_LIST);
+ nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE,
META_TYPE, null, storageManager, KEY_INDEXES,
+ KEY_INDICATOR_LIST);
IHyracksTaskContext ctx = nc.createTestContext(false);
nc.newJobId();
- ITransactionContext txnCtx =
nc.getTransactionManager().getTransactionContext(nc.getTxnJobId(), true);
+ ITransactionContext txnCtx =
+
nc.getTransactionManager().getTransactionContext(nc.getTxnJobId(ctx), true);
// Prepare insert operation
- LSMInsertDeleteOperatorNodePushable insertOp = nc
- .getInsertPipeline(ctx, dataset, KEY_TYPES,
RECORD_TYPE, META_TYPE, new NoMergePolicyFactory(),
- null, null, KEY_INDEXES, KEY_INDICATOR_LIST,
storageManager)
- .getLeft();
+ LSMInsertDeleteOperatorNodePushable insertOp =
nc.getInsertPipeline(ctx, dataset, KEY_TYPES,
+ RECORD_TYPE, META_TYPE, null, KEY_INDEXES,
KEY_INDICATOR_LIST, storageManager).getLeft();
insertOp.open();
TupleGenerator tupleGenerator = new
TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATOR,
RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS,
META_GEN_FUNCTION, UNIQUE_META_FIELDS);
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fbf3c0a9/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/DiskIsFullTest.java
----------------------------------------------------------------------
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/DiskIsFullTest.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/DiskIsFullTest.java
index 58697a9..eb47248 100644
---
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/DiskIsFullTest.java
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/DiskIsFullTest.java
@@ -119,26 +119,23 @@ public class DiskIsFullTest {
StorageComponentProvider storageManager = new
StorageComponentProvider();
List<List<String>> partitioningKeys = new ArrayList<>();
partitioningKeys.add(Collections.singletonList("key"));
- Dataset dataset =
- new Dataset(DATAVERSE_NAME, DATASET_NAME, DATAVERSE_NAME,
DATA_TYPE_NAME, NODE_GROUP_NAME, null,
- null,
- new InternalDatasetDetails(null,
PartitioningStrategy.HASH, partitioningKeys, null, null,
- null, false, null, false), null,
DatasetType.INTERNAL, DATASET_ID, 0);
+ Dataset dataset = new Dataset(DATAVERSE_NAME, DATASET_NAME,
DATAVERSE_NAME, DATA_TYPE_NAME, NODE_GROUP_NAME,
+ NoMergePolicyFactory.NAME, null, new
InternalDatasetDetails(null, PartitioningStrategy.HASH,
+ partitioningKeys, null, null, null, false, null,
false),
+ null, DatasetType.INTERNAL, DATASET_ID, 0);
try {
- nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE,
META_TYPE, new NoMergePolicyFactory(), null,
- null, storageManager, KEY_INDEXES, KEY_INDICATOR_LIST);
+ nc.createPrimaryIndex(dataset, KEY_TYPES, RECORD_TYPE,
META_TYPE, null, storageManager, KEY_INDEXES,
+ KEY_INDICATOR_LIST);
IHyracksTaskContext ctx = nc.createTestContext(false);
nc.newJobId();
- ITransactionContext txnCtx =
nc.getTransactionManager().getTransactionContext(nc.getTxnJobId(), true);
+ ITransactionContext txnCtx =
+
nc.getTransactionManager().getTransactionContext(nc.getTxnJobId(ctx), true);
// Prepare insert operation
- LSMInsertDeleteOperatorNodePushable insertOp =
- nc.getInsertPipeline(ctx, dataset, KEY_TYPES,
RECORD_TYPE, META_TYPE,
- new NoMergePolicyFactory(), null, null,
KEY_INDEXES, KEY_INDICATOR_LIST, storageManager)
- .getLeft();
+ LSMInsertDeleteOperatorNodePushable insertOp =
nc.getInsertPipeline(ctx, dataset, KEY_TYPES,
+ RECORD_TYPE, META_TYPE, null, KEY_INDEXES,
KEY_INDICATOR_LIST, storageManager).getLeft();
insertOp.open();
- TupleGenerator tupleGenerator =
- new TupleGenerator(RECORD_TYPE, META_TYPE,
KEY_INDEXES, KEY_INDICATOR, RECORD_GEN_FUNCTION,
- UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION,
UNIQUE_META_FIELDS);
+ TupleGenerator tupleGenerator = new
TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATOR,
+ RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS,
META_GEN_FUNCTION, UNIQUE_META_FIELDS);
VSizeFrame frame = new VSizeFrame(ctx);
FrameTupleAppender tupleAppender = new
FrameTupleAppender(frame);
// Insert records until disk becomes full
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fbf3c0a9/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
----------------------------------------------------------------------
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index 895cd55..6d3b6c2 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -229,7 +229,6 @@ public class ErrorCode {
public static final int ACTIVE_ENTITY_LISTENER_IS_NOT_REGISTERED = 3097;
public static final int CANNOT_DERIGESTER_ACTIVE_ENTITY_LISTENER = 3098;
public static final int
DOUBLE_INITIALIZATION_OF_ACTIVE_NOTIFICATION_HANDLER = 3099;
- public static final int FAILED_TO_SHUTDOWN_EVENT_PROCESSOR = 3100;
public static final int DOUBLE_RECOVERY_ATTEMPTS = 3101;
public static final int UNREPORTED_TASK_FAILURE_EXCEPTION = 3102;
public static final int ACTIVE_ENTITY_ALREADY_SUSPENDED = 3103;
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fbf3c0a9/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
----------------------------------------------------------------------
diff --git
a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index 98b8e2f..e428721 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -218,7 +218,6 @@
3097 = Active Entity %1$s has not been registered
3098 = Cannot deregister %1$s because it is active
3099 = Attempt to initialize an initialized Active Notification Handler
-3100 = Failed to shutdown event processor for %1$s
3101 = Recovery request while recovery is currently ongoing
3102 = Unreported exception causing task failure
3103 = %1$s is already suspended and has state %2$s
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fbf3c0a9/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
----------------------------------------------------------------------
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
index a071345..42da1d7 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/exceptions/ErrorCode.java
@@ -133,6 +133,10 @@ public class ErrorCode {
public static final int ILLEGAL_ATTEMPT_TO_EXIT_EMPTY_COMPONENT = 97;
public static final int A_FLUSH_OPERATION_HAS_FAILED = 98;
public static final int A_MERGE_OPERATION_HAS_FAILED = 99;
+ public static final int FAILED_TO_SHUTDOWN_EVENT_PROCESSOR = 100;
+ public static final int PAGE_DOES_NOT_EXIST_IN_FILE = 101;
+ public static final int VBC_ALREADY_OPEN = 102;
+ public static final int VBC_ALREADY_CLOSED = 103;
// Compilation error codes.
public static final int RULECOLLECTION_NOT_INSTANCE_OF_LIST = 10000;
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fbf3c0a9/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/SingleThreadEventProcessor.java
----------------------------------------------------------------------
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/SingleThreadEventProcessor.java
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/SingleThreadEventProcessor.java
new file mode 100644
index 0000000..21965c7
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/util/SingleThreadEventProcessor.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.api.util;
+
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.hyracks.api.exceptions.ErrorCode;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public abstract class SingleThreadEventProcessor<T> implements Runnable {
+
+ private static final Logger LOGGER =
Logger.getLogger(SingleThreadEventProcessor.class.getName());
+ private final String name;
+ private final LinkedBlockingQueue<T> eventInbox;
+ private volatile Thread executorThread;
+ private volatile boolean stopped = false;
+
+ public SingleThreadEventProcessor(String threadName) {
+ this.name = threadName;
+ eventInbox = new LinkedBlockingQueue<>();
+ executorThread = new Thread(this, threadName);
+ executorThread.start();
+ }
+
+ @Override
+ public final void run() {
+ LOGGER.log(Level.INFO, "Started " + Thread.currentThread().getName());
+ while (!stopped) {
+ try {
+ T event = eventInbox.take();
+ handle(event);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } catch (Exception e) {
+ LOGGER.log(Level.SEVERE, "Error handling an event", e);
+ }
+ }
+ LOGGER.log(Level.WARNING, "Stopped " +
Thread.currentThread().getName());
+ }
+
+ protected abstract void handle(T event) throws Exception; //NOSONAR
+
+ public void add(T event) {
+ if (!eventInbox.add(event)) {
+ throw new IllegalStateException();
+ }
+ }
+
+ public void stop() throws HyracksDataException, InterruptedException {
+ stopped = true;
+ executorThread.interrupt();
+ executorThread.join(1000);
+ int attempt = 0;
+ while (executorThread.isAlive()) {
+ attempt++;
+ LOGGER.log(Level.WARNING,
+ "Failed to stop event processor after " + attempt + "
attempts. Interrupted exception swallowed?");
+ if (attempt == 10) {
+ throw
HyracksDataException.create(ErrorCode.FAILED_TO_SHUTDOWN_EVENT_PROCESSOR, name);
+ }
+ executorThread.interrupt();
+ executorThread.join(1000);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fbf3c0a9/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
----------------------------------------------------------------------
diff --git
a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
index ae579b4..bc83f8c 100644
---
a/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
+++
b/hyracks-fullstack/hyracks/hyracks-api/src/main/resources/errormsg/en.properties
@@ -116,5 +116,9 @@
97 = Illegal attempt to exit empty component
98 = A flush operation has failed
99 = A merge operation has failed
+100 = Failed to shutdown event processor for %1$s
+101 = Page %1$s does not exist in file %2$s
+102 = Failed to open virtual buffer cache since it is already open
+103 = Failed to close virtual buffer cache since it is already closed
10000 = The given rule collection %1$s is not an instance of the List class.
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fbf3c0a9/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/api/AbstractPointable.java
----------------------------------------------------------------------
diff --git
a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/api/AbstractPointable.java
b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/api/AbstractPointable.java
index 77f18ea..05417a8 100644
---
a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/api/AbstractPointable.java
+++
b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/api/AbstractPointable.java
@@ -25,19 +25,6 @@ public abstract class AbstractPointable implements
IPointable {
protected int length;
- /**
- * copies the content of this pointable to the passed byte array.
- * the array is expected to be at least of length = length of this
pointable
- *
- * @param copy
- * the array to write into
- * @throws ArrayIndexOutOfBoundsException
- * if the passed array size is smaller than length
- */
- public void copyInto(byte[] copy) {
- System.arraycopy(bytes, start, copy, 0, length);
- }
-
@Override
public void set(byte[] bytes, int start, int length) {
this.bytes = bytes;
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fbf3c0a9/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/DataUtils.java
----------------------------------------------------------------------
diff --git
a/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/DataUtils.java
b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/DataUtils.java
new file mode 100644
index 0000000..5de0b84
--- /dev/null
+++
b/hyracks-fullstack/hyracks/hyracks-data/hyracks-data-std/src/main/java/org/apache/hyracks/data/std/util/DataUtils.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.data.std.util;
+
+import org.apache.hyracks.data.std.api.IValueReference;
+
+public class DataUtils {
+
+ private DataUtils() {
+ }
+
+ /**
+ * Copies the content of this pointable to the passed byte array.
+ * the array is expected to be at least of length = length of this
pointable
+ *
+ * @param value
+ * the value to be copied
+ * @param copy
+ * the array to write into
+ * @throws ArrayIndexOutOfBoundsException
+ * if the passed array size is smaller than length
+ */
+ public static void copyInto(IValueReference value, byte[] copy) {
+ System.arraycopy(value.getByteArray(), value.getStartOffset(), copy,
0, value.getLength());
+ }
+
+ /**
+ * Copies the content of this pointable to the passed byte array.
+ * the array is expected to be at least of length = offset + length of
this pointable
+ *
+ * @param value
+ * the value to be copied
+ * @param copy
+ * the array to write into
+ * @param offset
+ * the offset to start writing from
+ * @throws ArrayIndexOutOfBoundsException
+ * if the passed array size - offset is smaller than length
+ */
+ public static void copyInto(IValueReference value, byte[] copy, int
offset) {
+ System.arraycopy(value.getByteArray(), value.getStartOffset(), copy,
offset, value.getLength());
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fbf3c0a9/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTree.java
----------------------------------------------------------------------
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTree.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTree.java
index 5e3042e..1fbcbb0 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTree.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-btree/src/main/java/org/apache/hyracks/storage/am/btree/impls/BTree.java
@@ -1135,7 +1135,7 @@ public class BTree extends AbstractTreeIndex {
((IBTreeInteriorFrame) interiorFrame).deleteGreatest();
int finalPageId = freePageManager.takePage(metaFrame);
- bufferCache.setPageDiskId(frontier.page,
BufferedFileHandle.getDiskPageId(getFileId(), finalPageId));
+
frontier.page.setDiskPageId(BufferedFileHandle.getDiskPageId(getFileId(),
finalPageId));
pagesToWrite.add(frontier.page);
splitKey.setLeftPage(finalPageId);
@@ -1156,7 +1156,7 @@ public class BTree extends AbstractTreeIndex {
if (level < 1) {
ICachedPage lastLeaf = nodeFrontiers.get(level).page;
int lastLeafPage = nodeFrontiers.get(level).pageId;
- setPageDpid(lastLeaf, nodeFrontiers.get(level).pageId);
+
lastLeaf.setDiskPageId(BufferedFileHandle.getDiskPageId(getFileId(),
nodeFrontiers.get(level).pageId));
queue.put(lastLeaf);
nodeFrontiers.get(level).page = null;
persistFrontiers(level + 1, lastLeafPage);
@@ -1171,7 +1171,7 @@ public class BTree extends AbstractTreeIndex {
}
((IBTreeInteriorFrame)
interiorFrame).setRightmostChildPageId(rightPage);
int finalPageId = freePageManager.takePage(metaFrame);
- setPageDpid(frontier.page, finalPageId);
+
frontier.page.setDiskPageId(BufferedFileHandle.getDiskPageId(getFileId(),
finalPageId));
queue.put(frontier.page);
frontier.pageId = finalPageId;
@@ -1193,10 +1193,6 @@ public class BTree extends AbstractTreeIndex {
public void abort() throws HyracksDataException {
super.handleException();
}
-
- private void setPageDpid(ICachedPage page, int pageId) {
- bufferCache.setPageDiskId(page,
BufferedFileHandle.getDiskPageId(getFileId(), pageId));
- }
}
@SuppressWarnings("rawtypes")
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fbf3c0a9/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/AppendOnlyLinkedMetadataPageManager.java
----------------------------------------------------------------------
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/AppendOnlyLinkedMetadataPageManager.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/AppendOnlyLinkedMetadataPageManager.java
index 1ee48f6..a051364 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/AppendOnlyLinkedMetadataPageManager.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/AppendOnlyLinkedMetadataPageManager.java
@@ -219,7 +219,7 @@ public class AppendOnlyLinkedMetadataPageManager implements
IMetadataPageManager
confiscatedPage.releaseWriteLatch(false);
}
int finalMetaPage = getMaxPageId(metaFrame) + 1;
- bufferCache.setPageDiskId(confiscatedPage,
BufferedFileHandle.getDiskPageId(fileId, finalMetaPage));
+
confiscatedPage.setDiskPageId(BufferedFileHandle.getDiskPageId(fileId,
finalMetaPage));
queue.put(confiscatedPage);
bufferCache.finishQueue();
metadataPage = getMetadataPageId();
@@ -345,8 +345,10 @@ public class AppendOnlyLinkedMetadataPageManager
implements IMetadataPageManager
try {
frame.setPage(page);
int inPageOffset = frame.getOffset(key);
- return inPageOffset >= 0 ? ((long) pageId *
bufferCache.getPageSizeWithHeader()) + frame.getOffset(key)
- + IBufferCache.RESERVED_HEADER_BYTES : -1L;
+ return inPageOffset >= 0
+ ? ((long) pageId *
bufferCache.getPageSizeWithHeader()) + frame.getOffset(key)
+ + IBufferCache.RESERVED_HEADER_BYTES
+ : -1L;
} finally {
page.releaseReadLatch();
unpinPage(page);
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fbf3c0a9/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/LinkedMetaDataPageManager.java
----------------------------------------------------------------------
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/LinkedMetaDataPageManager.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/LinkedMetaDataPageManager.java
index 686fe78..d8afd12 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/LinkedMetaDataPageManager.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/freepage/LinkedMetaDataPageManager.java
@@ -195,7 +195,7 @@ public class LinkedMetaDataPageManager implements
IMetadataPageManager {
metaFrame.setMaxPage(1);
} finally {
metaNode.releaseWriteLatch(true);
- bufferCache.flushDirtyPage(metaNode);
+ bufferCache.flush(metaNode);
bufferCache.unpin(metaNode);
}
int rootPage = getRootPageId();
@@ -207,7 +207,7 @@ public class LinkedMetaDataPageManager implements
IMetadataPageManager {
leafFrame.initBuffer((byte) 0);
} finally {
rootNode.releaseWriteLatch(true);
- bufferCache.flushDirtyPage(rootNode);
+ bufferCache.flush(rootNode);
bufferCache.unpin(rootNode);
}
}
@@ -249,7 +249,7 @@ public class LinkedMetaDataPageManager implements
IMetadataPageManager {
metaFrame.setValid(true);
} finally {
metaNode.releaseWriteLatch(true);
- bufferCache.flushDirtyPage(metaNode);
+ bufferCache.flush(metaNode);
bufferCache.unpin(metaNode);
ready = true;
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fbf3c0a9/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java
----------------------------------------------------------------------
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java
index ae66402..f03a358 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-common/src/main/java/org/apache/hyracks/storage/am/common/impls/AbstractTreeIndex.java
@@ -342,4 +342,9 @@ public abstract class AbstractTreeIndex implements
ITreeIndex {
public boolean hasMemoryComponents() {
return true;
}
+
+ @Override
+ public String toString() {
+ return "{\"class\":\"" + getClass().getSimpleName() + "\",\"file\":\""
+ file.getRelativePath() + "\"}";
+ }
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fbf3c0a9/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
----------------------------------------------------------------------
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
index 280cc52..0b59e91 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMDiskComponent.java
@@ -197,4 +197,9 @@ public abstract class AbstractLSMDiskComponent extends
AbstractLSMComponent impl
.addBulkLoader(createIndexBulkLoader(fillFactor, verifyInput,
numElementsHint, checkIfEmptyIndex));
return chainedBulkLoader;
}
+
+ @Override
+ public String toString() {
+ return "{\"class\":" + getClass().getSimpleName() + "\", \"index\":" +
getIndex().toString() + "}";
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fbf3c0a9/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java
----------------------------------------------------------------------
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java
index a4fe35c..57db635 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/AbstractLSMMemoryComponent.java
@@ -255,6 +255,6 @@ public abstract class AbstractLSMMemoryComponent extends
AbstractLSMComponent im
@Override
public long getSize() {
IBufferCache virtualBufferCache = getIndex().getBufferCache();
- return virtualBufferCache.getNumPages() * (long)
virtualBufferCache.getPageSize();
+ return virtualBufferCache.getPageBudget() * (long)
virtualBufferCache.getPageSize();
}
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fbf3c0a9/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/DiskComponentMetadata.java
----------------------------------------------------------------------
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/DiskComponentMetadata.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/DiskComponentMetadata.java
index b7d2ea3..d1244ce 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/DiskComponentMetadata.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/DiskComponentMetadata.java
@@ -45,7 +45,7 @@ public class DiskComponentMetadata implements
IComponentMetadata {
@Override
public IValueReference get(IValueReference key) throws
HyracksDataException {
- IPointable value = VoidPointable.FACTORY.createPointable();
+ VoidPointable value = VoidPointable.FACTORY.createPointable();
get(key, value);
return value;
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fbf3c0a9/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MemoryComponentMetadata.java
----------------------------------------------------------------------
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MemoryComponentMetadata.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MemoryComponentMetadata.java
index dcc9355..1b827b7 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MemoryComponentMetadata.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MemoryComponentMetadata.java
@@ -20,6 +20,8 @@ package org.apache.hyracks.storage.am.lsm.common.impls;
import java.util.ArrayList;
import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -31,6 +33,7 @@ import
org.apache.hyracks.storage.am.common.api.ITreeIndexMetadataFrame;
import org.apache.hyracks.storage.am.lsm.common.api.IComponentMetadata;
public class MemoryComponentMetadata implements IComponentMetadata {
+ private static final Logger LOGGER =
Logger.getLogger(MemoryComponentMetadata.class.getName());
private static final byte[] empty = new byte[0];
private final List<org.apache.commons.lang3.tuple.Pair<IValueReference,
ArrayBackedValueStorage>> store =
new ArrayList<>();
@@ -43,9 +46,9 @@ public class MemoryComponentMetadata implements
IComponentMetadata {
ArrayBackedValueStorage stored = get(key);
if (stored == null) {
stored = new ArrayBackedValueStorage();
+ store.add(Pair.of(key, stored));
}
stored.assign(value);
- store.add(Pair.of(key, stored));
}
/**
@@ -71,8 +74,12 @@ public class MemoryComponentMetadata implements
IComponentMetadata {
}
public void copy(IMetadataPageManager mdpManager) throws
HyracksDataException {
+ LOGGER.log(Level.INFO, "Copying Metadata into a different component");
ITreeIndexMetadataFrame frame = mdpManager.createMetadataFrame();
for (Pair<IValueReference, ArrayBackedValueStorage> pair : store) {
+ if (LOGGER.isLoggable(Level.INFO)) {
+ LOGGER.log(Level.INFO, "Copying " + pair.getKey() + " : " +
pair.getValue().getLength() + " bytes");
+ }
mdpManager.put(frame, pair.getKey(), pair.getValue());
}
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fbf3c0a9/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MultitenantVirtualBufferCache.java
----------------------------------------------------------------------
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MultitenantVirtualBufferCache.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MultitenantVirtualBufferCache.java
index 83e140c..7a3d58b 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MultitenantVirtualBufferCache.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/MultitenantVirtualBufferCache.java
@@ -20,8 +20,6 @@ package org.apache.hyracks.storage.am.lsm.common.impls;
import java.util.HashMap;
import java.util.Map;
-import java.util.logging.Level;
-import java.util.logging.Logger;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
@@ -34,7 +32,6 @@ import org.apache.hyracks.storage.common.file.IFileMapManager;
import org.apache.hyracks.util.JSONUtil;
public class MultitenantVirtualBufferCache implements IVirtualBufferCache {
- private static final Logger LOGGER =
Logger.getLogger(ExternalIndexHarness.class.getName());
private final IVirtualBufferCache vbc;
private int openCount;
@@ -65,11 +62,6 @@ public class MultitenantVirtualBufferCache implements
IVirtualBufferCache {
}
@Override
- public ICachedPage tryPin(long dpid) throws HyracksDataException {
- return vbc.tryPin(dpid);
- }
-
- @Override
public ICachedPage pin(long dpid, boolean newPage) throws
HyracksDataException {
return vbc.pin(dpid, newPage);
}
@@ -80,8 +72,8 @@ public class MultitenantVirtualBufferCache implements
IVirtualBufferCache {
}
@Override
- public void flushDirtyPage(ICachedPage page) throws HyracksDataException {
- vbc.flushDirtyPage(page);
+ public void flush(ICachedPage page) throws HyracksDataException {
+ vbc.flush(page);
}
@Override
@@ -100,8 +92,8 @@ public class MultitenantVirtualBufferCache implements
IVirtualBufferCache {
}
@Override
- public int getNumPages() {
- return vbc.getNumPages();
+ public int getPageBudget() {
+ return vbc.getPageBudget();
}
@Override
@@ -141,14 +133,6 @@ public class MultitenantVirtualBufferCache implements
IVirtualBufferCache {
}
@Override
- public void adviseWontNeed(ICachedPage page) {
- if (LOGGER.isLoggable(Level.INFO)) {
- LOGGER.log(Level.INFO, "Calling adviseWontNeed on " +
this.getClass().getName()
- + " makes no sense as this BufferCache cannot evict
pages");
- }
- }
-
- @Override
public ICachedPage confiscatePage(long dpid) throws HyracksDataException {
return vbc.confiscatePage(dpid);
}
@@ -175,11 +159,6 @@ public class MultitenantVirtualBufferCache implements
IVirtualBufferCache {
}
@Override
- public void setPageDiskId(ICachedPage page, long dpid) {
-
- }
-
- @Override
public void returnPage(ICachedPage page, boolean reinsert) {
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/fbf3c0a9/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoMergePolicyFactory.java
----------------------------------------------------------------------
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoMergePolicyFactory.java
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoMergePolicyFactory.java
index 3195f57..8ce636b 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoMergePolicyFactory.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/impls/NoMergePolicyFactory.java
@@ -31,13 +31,13 @@ import
org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicyFactory;
public class NoMergePolicyFactory implements ILSMMergePolicyFactory {
private static final long serialVersionUID = 1L;
-
private static final String[] SET_VALUES = new String[] {};
private static final Set<String> PROPERTIES_NAMES = new
HashSet<>(Arrays.asList(SET_VALUES));
+ public static final String NAME = "no-merge";
@Override
public String getName() {
- return "no-merge";
+ return NAME;
}
@Override