Repository: asterixdb Updated Branches: refs/heads/master f9e6bae98 -> c5a0a1974
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c5a0a197/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTracker.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTracker.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTracker.java new file mode 100644 index 0000000..e376ff9 --- /dev/null +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTracker.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.test.dataflow; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.asterix.common.context.DatasetInfo; +import org.apache.asterix.common.context.PrimaryIndexOperationTracker; +import org.apache.asterix.common.transactions.ILogManager; +import org.apache.asterix.common.transactions.LogRecord; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.storage.am.lsm.btree.impl.ITestOpCallback; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator; + +public class TestPrimaryIndexOperationTracker extends PrimaryIndexOperationTracker { + + private final List<ITestOpCallback<Void>> callbacks = new ArrayList<>(); + + public TestPrimaryIndexOperationTracker(int datasetID, ILogManager logManager, DatasetInfo dsInfo, + ILSMComponentIdGenerator idGenerator) { + super(datasetID, logManager, dsInfo, idGenerator); + } + + public void addCallback(ITestOpCallback<Void> callback) { + synchronized (callbacks) { + callbacks.add(callback); + } + } + + public void clearCallbacks() { + synchronized (callbacks) { + callbacks.clear(); + } + } + + @Override + public void triggerScheduleFlush(LogRecord logRecord) throws HyracksDataException { + synchronized (callbacks) { + for (ITestOpCallback<Void> callback : callbacks) { + callback.before(null); + } + } + super.triggerScheduleFlush(logRecord); + synchronized (callbacks) { + for (ITestOpCallback<Void> callback : callbacks) { + callback.after(); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c5a0a197/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java new file mode 100644 index 0000000..5d7a7c6 --- /dev/null +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestPrimaryIndexOperationTrackerFactory.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.test.dataflow; + +import java.lang.reflect.Field; +import java.lang.reflect.Modifier; + +import org.apache.asterix.common.api.INcApplicationContext; +import org.apache.asterix.common.context.DatasetLifecycleManager; +import org.apache.asterix.common.context.DatasetResource; +import org.apache.asterix.common.context.PrimaryIndexOperationTracker; +import org.apache.asterix.transaction.management.opcallbacks.PrimaryIndexOperationTrackerFactory; +import org.apache.hyracks.api.application.INCServiceContext; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker; + +public class TestPrimaryIndexOperationTrackerFactory extends PrimaryIndexOperationTrackerFactory { + + private static final long serialVersionUID = 1L; + private int datasetId; + + public TestPrimaryIndexOperationTrackerFactory(int datasetId) { + super(datasetId); + this.datasetId = datasetId; + } + + @Override + public ILSMOperationTracker getOperationTracker(INCServiceContext ctx) { + try { + INcApplicationContext appCtx = (INcApplicationContext) ctx.getApplicationContext(); + DatasetLifecycleManager dslcManager = (DatasetLifecycleManager) appCtx.getDatasetLifecycleManager(); + DatasetResource dsr = dslcManager.getDatasetLifecycle(datasetId); + PrimaryIndexOperationTracker opTracker = dsr.getOpTracker(); + if (!(opTracker instanceof TestPrimaryIndexOperationTracker)) { + Field opTrackerField = DatasetResource.class.getDeclaredField("datasetPrimaryOpTracker"); + opTracker = new TestPrimaryIndexOperationTracker(datasetId, + appCtx.getTransactionSubsystem().getLogManager(), dsr.getDatasetInfo(), dsr.getIdGenerator()); + setFinal(opTrackerField, dsr, opTracker); + } + return opTracker; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + static void setFinal(Field field, Object obj, Object newValue) throws Exception { + field.setAccessible(true); + Field modifiersField = Field.class.getDeclaredField("modifiers"); + modifiersField.setAccessible(true); + modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL); + field.set(obj, newValue); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c5a0a197/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java index 30cfb4f..7543bf6 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/logging/CheckpointingTest.java @@ -134,7 +134,7 @@ public class CheckpointingTest { new TransactionOptions(ITransactionManager.AtomicityLevel.ENTITY_LEVEL)); // Prepare insert operation LSMInsertDeleteOperatorNodePushable insertOp = nc.getInsertPipeline(ctx, dataset, KEY_TYPES, - RECORD_TYPE, META_TYPE, null, KEY_INDEXES, KEY_INDICATOR_LIST, storageManager).getLeft(); + RECORD_TYPE, META_TYPE, null, KEY_INDEXES, KEY_INDICATOR_LIST, storageManager, null).getLeft(); insertOp.open(); TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATOR, RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c5a0a197/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/DiskIsFullTest.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/DiskIsFullTest.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/DiskIsFullTest.java index dabf9d3..8897169 100644 --- a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/DiskIsFullTest.java +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/storage/DiskIsFullTest.java @@ -130,7 +130,7 @@ public class DiskIsFullTest { IHyracksTaskContext ctx = nc.createTestContext(jobId, 0, false); // Prepare insert operation LSMInsertDeleteOperatorNodePushable insertOp = nc.getInsertPipeline(ctx, dataset, KEY_TYPES, - RECORD_TYPE, META_TYPE, null, KEY_INDEXES, KEY_INDICATOR_LIST, storageManager).getLeft(); + RECORD_TYPE, META_TYPE, null, KEY_INDEXES, KEY_INDICATOR_LIST, storageManager, null).getLeft(); insertOp.open(); TupleGenerator tupleGenerator = new TupleGenerator(RECORD_TYPE, META_TYPE, KEY_INDEXES, KEY_INDICATOR, RECORD_GEN_FUNCTION, UNIQUE_RECORD_FIELDS, META_GEN_FUNCTION, UNIQUE_META_FIELDS); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c5a0a197/asterixdb/asterix-app/src/test/resources/multi-partition-test-configuration.xml ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/resources/multi-partition-test-configuration.xml b/asterixdb/asterix-app/src/test/resources/multi-partition-test-configuration.xml new file mode 100644 index 0000000..aaeb244 --- /dev/null +++ b/asterixdb/asterix-app/src/test/resources/multi-partition-test-configuration.xml @@ -0,0 +1,112 @@ +<!-- + ! Licensed to the Apache Software Foundation (ASF) under one + ! or more contributor license agreements. See the NOTICE file + ! distributed with this work for additional information + ! regarding copyright ownership. The ASF licenses this file + ! to you under the Apache License, Version 2.0 (the + ! "License"); you may not use this file except in compliance + ! with the License. You may obtain a copy of the License at + ! + ! http://www.apache.org/licenses/LICENSE-2.0 + ! + ! Unless required by applicable law or agreed to in writing, + ! software distributed under the License is distributed on an + ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + ! KIND, either express or implied. See the License for the + ! specific language governing permissions and limitations + ! under the License. + !--> +<asterixConfiguration xmlns="asterixconf"> + <metadataNode>asterix_nc1</metadataNode> + <store> + <ncId>asterix_nc1</ncId> + <storeDirs>iodevice0,iodevice1</storeDirs> + </store> + <store> + <ncId>asterix_nc2</ncId> + <storeDirs>iodevice0,iodevice1</storeDirs> + </store> + <transactionLogDir> + <ncId>asterix_nc1</ncId> + <txnLogDirPath>target/txnLogDir/asterix_nc1</txnLogDirPath> + </transactionLogDir> + <transactionLogDir> + <ncId>asterix_nc2</ncId> + <txnLogDirPath>target/txnLogDir/asterix_nc2</txnLogDirPath> + </transactionLogDir> + + <property> + <name>max.wait.active.cluster</name> + <value>60</value> + <description>Maximum wait (in seconds) for a cluster to be ACTIVE (all + nodes are available) + before a submitted query/statement can be + executed. (Default = 60 seconds) + </description> + </property> + + <property> + <name>compiler.framesize</name> + <value>32KB</value> + </property> + <property> + <name>compiler.sortmemory</name> + <value>320KB</value> + </property> + <property> + <name>compiler.groupmemory</name> + <value>160KB</value> + </property> + <property> + <name>compiler.joinmemory</name> + <value>256KB</value> + </property> + <property> + <name>storage.buffercache.pagesize</name> + <value>32KB</value> + <description>The page size in bytes for pages in the buffer cache. + (Default = "128KB") + </description> + </property> + <property> + <name>storage.buffercache.size</name> + <value>48MB</value> + <description>The size of memory allocated to the disk buffer cache. + The value should be a multiple of the buffer cache page size. + </description> + </property> + <property> + <name>storage.memorycomponent.numpages</name> + <value>32</value> + <description>The number of pages to allocate for a memory component. + This budget is shared by all the memory components of the primary + index and all its secondary indexes across all I/O devices on a node. + Note: in-memory components usually has fill factor of 75% since + the pages are 75% full and the remaining 25% is un-utilized. + </description> + </property> + <property> + <name>storage.memorycomponent.globalbudget</name> + <value>512MB</value> + <description>The size of memory allocated to the memory components. + The value should be a multiple of the memory component page size. + </description> + </property> + <property> + <name>messaging.frame.size</name> + <value>4096</value> + <description>The frame size to be used for NC to NC messaging. (Default = 4kb) + </description> + </property> + <property> + <name>messaging.frame.count</name> + <value>512</value> + <description>Number of reusable frames for NC to NC messaging. (Default = 512) + </description> + </property> + <property> + <name>log.level</name> + <value>INFO</value> + <description>foo</description> + </property> +</asterixConfiguration> http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c5a0a197/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java index 8002895..ce43bca 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java @@ -207,7 +207,7 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC if (opTracker != null && opTracker.getNumActiveOperations() == 0 && dsr.getDatasetInfo().getReferenceCount() == 0 && dsr.getDatasetInfo().isOpen() && !dsr.isMetadataDataset()) { - closeDataset(dsr.getDatasetInfo()); + closeDataset(dsr); LOGGER.info(() -> "Evicted Dataset" + dsr.getDatasetID()); return true; } @@ -341,7 +341,7 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC @Override public synchronized void flushAllDatasets() throws HyracksDataException { for (DatasetResource dsr : datasets.values()) { - flushDatasetOpenIndexes(dsr.getDatasetInfo(), false); + flushDatasetOpenIndexes(dsr, false); } } @@ -349,7 +349,7 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC public synchronized void flushDataset(int datasetId, boolean asyncFlush) throws HyracksDataException { DatasetResource dsr = datasets.get(datasetId); if (dsr != null) { - flushDatasetOpenIndexes(dsr.getDatasetInfo(), asyncFlush); + flushDatasetOpenIndexes(dsr, asyncFlush); } } @@ -385,11 +385,17 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC /* * This method can only be called asynchronously safely if we're sure no modify operation will take place until the flush is scheduled */ - private void flushDatasetOpenIndexes(DatasetInfo dsInfo, boolean asyncFlush) throws HyracksDataException { + private void flushDatasetOpenIndexes(DatasetResource dsr, boolean asyncFlush) throws HyracksDataException { + DatasetInfo dsInfo = dsr.getDatasetInfo(); if (dsInfo.isExternal()) { // no memory components for external dataset return; } + PrimaryIndexOperationTracker primaryOpTracker = dsr.getOpTracker(); + if (primaryOpTracker.getNumActiveOperations() > 0) { + throw new IllegalStateException( + "flushDatasetOpenIndexes is called on a dataset with currently active operations"); + } ILSMComponentIdGenerator idGenerator = getComponentIdGenerator(dsInfo.getDatasetID()); idGenerator.refresh(); @@ -435,11 +441,12 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC } } - private void closeDataset(DatasetInfo dsInfo) throws HyracksDataException { + private void closeDataset(DatasetResource dsr) throws HyracksDataException { // First wait for any ongoing IO operations + DatasetInfo dsInfo = dsr.getDatasetInfo(); dsInfo.waitForIO(); try { - flushDatasetOpenIndexes(dsInfo, false); + flushDatasetOpenIndexes(dsr, false); } catch (Exception e) { throw HyracksDataException.create(e); } @@ -460,7 +467,7 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC public synchronized void closeAllDatasets() throws HyracksDataException { ArrayList<DatasetResource> openDatasets = new ArrayList<>(datasets.values()); for (DatasetResource dsr : openDatasets) { - closeDataset(dsr.getDatasetInfo()); + closeDataset(dsr); } } @@ -469,7 +476,7 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC ArrayList<DatasetResource> openDatasets = new ArrayList<>(datasets.values()); for (DatasetResource dsr : openDatasets) { if (!dsr.isMetadataDataset()) { - closeDataset(dsr.getDatasetInfo()); + closeDataset(dsr); } } } @@ -568,7 +575,7 @@ public class DatasetLifecycleManager implements IDatasetLifecycleManager, ILifeC public void flushDataset(IReplicationStrategy replicationStrategy) throws HyracksDataException { for (DatasetResource dsr : datasets.values()) { if (replicationStrategy.isMatch(dsr.getDatasetID())) { - flushDatasetOpenIndexes(dsr.getDatasetInfo(), false); + flushDatasetOpenIndexes(dsr, false); } } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c5a0a197/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallback.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallback.java b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallback.java index 7bb12c4..8df872b 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallback.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-am-lsm-common/src/main/java/org/apache/hyracks/storage/am/lsm/common/api/ILSMIOOperationCallback.java @@ -61,6 +61,7 @@ public interface ILSMIOOperationCallback { * * @param component * @param componentSwitched + * true if the component index was advanced for this recycle, false otherwise */ void recycled(ILSMMemoryComponent component, boolean componentSwitched) throws HyracksDataException; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c5a0a197/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/ITestOpCallback.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/ITestOpCallback.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/ITestOpCallback.java index 2989af9..acc3347 100644 --- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/ITestOpCallback.java +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/ITestOpCallback.java @@ -18,8 +18,8 @@ */ package org.apache.hyracks.storage.am.lsm.btree.impl; -import java.util.concurrent.Semaphore; +public interface ITestOpCallback<T> { + void before(T t); -public interface ITestOpCallback { - void callback(Semaphore smeaphore); + void after(); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c5a0a197/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/IVirtualBufferCacheCallback.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/IVirtualBufferCacheCallback.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/IVirtualBufferCacheCallback.java new file mode 100644 index 0000000..d5f3fb2 --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/IVirtualBufferCacheCallback.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.storage.am.lsm.btree.impl; + +public interface IVirtualBufferCacheCallback { + void isFullChanged(boolean newValue); +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c5a0a197/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtree.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtree.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtree.java index 8782565..bf3bb31 100644 --- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtree.java +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtree.java @@ -40,6 +40,7 @@ import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexFileManager; import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexOperationContext; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent; import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy; import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker; import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache; @@ -56,11 +57,17 @@ public class TestLsmBtree extends LSMBTree { private final Semaphore searchSemaphore = new Semaphore(0); private final Semaphore flushSemaphore = new Semaphore(0); private final Semaphore mergeSemaphore = new Semaphore(0); - private final List<ITestOpCallback> modifyCallbacks = new ArrayList<>(); - private final List<ITestOpCallback> searchCallbacks = new ArrayList<>(); - private final List<ITestOpCallback> flushCallbacks = new ArrayList<>(); - private final List<ITestOpCallback> mergeCallbacks = new ArrayList<>(); - private final List<ITestOpCallback> allocateComponentCallbacks = new ArrayList<>(); + private final List<ITestOpCallback<Semaphore>> modifyCallbacks = new ArrayList<>(); + private final List<ITestOpCallback<Semaphore>> searchCallbacks = new ArrayList<>(); + private final List<ITestOpCallback<Semaphore>> flushCallbacks = new ArrayList<>(); + private final List<ITestOpCallback<Semaphore>> mergeCallbacks = new ArrayList<>(); + + private final List<ITestOpCallback<ILSMMemoryComponent>> ioAllocateCallbacks = new ArrayList<>(); + private final List<ITestOpCallback<ILSMMemoryComponent>> ioRecycleCallbacks = new ArrayList<>(); + private final List<ITestOpCallback<Void>> ioBeforeCallbacks = new ArrayList<>(); + private final List<ITestOpCallback<Void>> ioAfterOpCallbacks = new ArrayList<>(); + private final List<ITestOpCallback<Void>> ioAfterFinalizeCallbacks = new ArrayList<>(); + private final List<ITestOpCallback<Void>> allocateComponentCallbacks = new ArrayList<>(); private volatile int numScheduledFlushes; private volatile int numStartedFlushes; @@ -89,17 +96,22 @@ public class TestLsmBtree extends LSMBTree { @Override public void modify(IIndexOperationContext ictx, ITupleReference tuple) throws HyracksDataException { synchronized (modifyCallbacks) { - for (ITestOpCallback callback : modifyCallbacks) { + for (ITestOpCallback<Semaphore> callback : modifyCallbacks) { callback(callback, modifySemaphore); } } acquire(modifySemaphore); super.modify(ictx, tuple); + synchronized (modifyCallbacks) { + for (ITestOpCallback<Semaphore> callback : modifyCallbacks) { + callback.after(); + } + } } - public static void callback(ITestOpCallback callback, Semaphore semaphore) { + public static <T> void callback(ITestOpCallback<T> callback, T t) { if (callback != null) { - callback.callback(semaphore); + callback.before(t); } } @@ -121,13 +133,18 @@ public class TestLsmBtree extends LSMBTree { public ILSMDiskComponent doFlush(ILSMIOOperation operation) throws HyracksDataException { numStartedFlushes++; synchronized (flushCallbacks) { - for (ITestOpCallback callback : flushCallbacks) { + for (ITestOpCallback<Semaphore> callback : flushCallbacks) { callback(callback, flushSemaphore); } } acquire(flushSemaphore); ILSMDiskComponent c = super.doFlush(operation); numFinishedFlushes++; + synchronized (flushCallbacks) { + for (ITestOpCallback<Semaphore> callback : flushCallbacks) { + callback.after(); + } + } return c; } @@ -135,13 +152,18 @@ public class TestLsmBtree extends LSMBTree { public ILSMDiskComponent doMerge(ILSMIOOperation operation) throws HyracksDataException { numStartedMerges++; synchronized (mergeCallbacks) { - for (ITestOpCallback callback : mergeCallbacks) { + for (ITestOpCallback<Semaphore> callback : mergeCallbacks) { callback(callback, mergeSemaphore); } } acquire(mergeSemaphore); ILSMDiskComponent c = super.doMerge(operation); numFinishedMerges++; + synchronized (mergeCallbacks) { + for (ITestOpCallback<Semaphore> callback : mergeCallbacks) { + callback.after(); + } + } return c; } @@ -199,11 +221,11 @@ public class TestLsmBtree extends LSMBTree { return numFinishedMerges; } - public List<ITestOpCallback> getModifyCallbacks() { + public List<ITestOpCallback<Semaphore>> getModifyCallbacks() { return modifyCallbacks; } - public void addModifyCallback(ITestOpCallback modifyCallback) { + public void addModifyCallback(ITestOpCallback<Semaphore> modifyCallback) { synchronized (mergeCallbacks) { modifyCallbacks.add(modifyCallback); } @@ -215,7 +237,31 @@ public class TestLsmBtree extends LSMBTree { } } - public List<ITestOpCallback> getSearchCallbacks() { + public void addIoRecycleCallback(ITestOpCallback<ILSMMemoryComponent> callback) { + synchronized (ioRecycleCallbacks) { + ioRecycleCallbacks.add(callback); + } + } + + public void clearIoRecycleCallback() { + synchronized (ioRecycleCallbacks) { + ioRecycleCallbacks.clear(); + } + } + + public void addIoAllocateCallback(ITestOpCallback<ILSMMemoryComponent> callback) { + synchronized (ioAllocateCallbacks) { + ioAllocateCallbacks.add(callback); + } + } + + public void clearIoAllocateCallback() { + synchronized (ioAllocateCallbacks) { + ioAllocateCallbacks.clear(); + } + } + + public List<ITestOpCallback<Semaphore>> getSearchCallbacks() { return searchCallbacks; } @@ -225,13 +271,13 @@ public class TestLsmBtree extends LSMBTree { } } - public void addSearchCallback(ITestOpCallback searchCallback) { + public void addSearchCallback(ITestOpCallback<Semaphore> searchCallback) { synchronized (searchCallbacks) { searchCallbacks.add(searchCallback); } } - public void addFlushCallback(ITestOpCallback flushCallback) { + public void addFlushCallback(ITestOpCallback<Semaphore> flushCallback) { synchronized (flushCallbacks) { flushCallbacks.add(flushCallback); } @@ -243,7 +289,7 @@ public class TestLsmBtree extends LSMBTree { } } - public void addMergeCallback(ITestOpCallback mergeCallback) { + public void addMergeCallback(ITestOpCallback<Semaphore> mergeCallback) { synchronized (mergeCallbacks) { mergeCallbacks.add(mergeCallback); } @@ -259,7 +305,7 @@ public class TestLsmBtree extends LSMBTree { return searchSemaphore; } - public void addAllocateCallback(ITestOpCallback callback) { + public void addAllocateCallback(ITestOpCallback<Void> callback) { synchronized (allocateComponentCallbacks) { allocateComponentCallbacks.add(callback); } @@ -271,13 +317,111 @@ public class TestLsmBtree extends LSMBTree { } } + public void addVirtuablBufferCacheCallback(IVirtualBufferCacheCallback callback) { + for (IVirtualBufferCache vbc : virtualBufferCaches) { + ((TestVirtualBufferCache) vbc).addCallback(callback); + } + } + + public void clearVirtuablBufferCacheCallbacks() { + for (IVirtualBufferCache vbc : virtualBufferCaches) { + ((TestVirtualBufferCache) vbc).clearCallbacks(); + } + } + @Override public void allocateMemoryComponents() throws HyracksDataException { - super.allocateMemoryComponents(); synchronized (allocateComponentCallbacks) { - for (ITestOpCallback callback : allocateComponentCallbacks) { + for (ITestOpCallback<Void> callback : allocateComponentCallbacks) { callback(callback, null); } } + super.allocateMemoryComponents(); + synchronized (allocateComponentCallbacks) { + for (ITestOpCallback<Void> callback : allocateComponentCallbacks) { + callback.after(); + } + } } + + public void beforeIoOperationCalled() { + synchronized (ioBeforeCallbacks) { + for (ITestOpCallback<Void> callback : ioBeforeCallbacks) { + callback.before(null); + } + } + } + + public void beforeIoOperationReturned() { + synchronized (ioBeforeCallbacks) { + for (ITestOpCallback<Void> callback : ioBeforeCallbacks) { + callback.after(); + } + } + } + + public void afterIoOperationCalled() { + synchronized (ioAfterOpCallbacks) { + for (ITestOpCallback<Void> callback : ioAfterOpCallbacks) { + callback.before(null); + } + } + } + + public void afterIoOperationReturned() { + synchronized (ioAfterOpCallbacks) { + for (ITestOpCallback<Void> callback : ioAfterOpCallbacks) { + callback.after(); + } + } + } + + public void afterIoFinalizeCalled() { + synchronized (ioAfterFinalizeCallbacks) { + for (ITestOpCallback<Void> callback : ioAfterFinalizeCallbacks) { + callback.before(null); + } + } + } + + public void afterIoFinalizeReturned() { + synchronized (ioAfterFinalizeCallbacks) { + for (ITestOpCallback<Void> callback : ioAfterFinalizeCallbacks) { + callback.after(); + } + } + } + + public void recycledCalled(ILSMMemoryComponent component) { + synchronized (ioRecycleCallbacks) { + for (ITestOpCallback<ILSMMemoryComponent> callback : ioRecycleCallbacks) { + callback.before(component); + } + } + } + + public void recycledReturned(ILSMMemoryComponent component) { + synchronized (ioRecycleCallbacks) { + for (ITestOpCallback<ILSMMemoryComponent> callback : ioRecycleCallbacks) { + callback.after(); + } + } + } + + public void allocatedCalled(ILSMMemoryComponent component) { + synchronized (ioAllocateCallbacks) { + for (ITestOpCallback<ILSMMemoryComponent> callback : ioAllocateCallbacks) { + callback.before(component); + } + } + } + + public void allocatedReturned(ILSMMemoryComponent component) { + synchronized (ioAllocateCallbacks) { + for (ITestOpCallback<ILSMMemoryComponent> callback : ioAllocateCallbacks) { + callback.after(); + } + } + } + } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c5a0a197/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResource.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResource.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResource.java index 47a8046..9b53120 100644 --- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResource.java +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeLocalResource.java @@ -60,6 +60,13 @@ public class TestLsmBtreeLocalResource extends LSMBTreeLocalResource { IIOManager ioManager = serviceCtx.getIoManager(); FileReference file = ioManager.resolve(path); List<IVirtualBufferCache> vbcs = vbcProvider.getVirtualBufferCaches(serviceCtx, file); + for (int i = 0; i < vbcs.size(); i++) { + IVirtualBufferCache vbc = vbcs.get(i); + if (!(vbc instanceof TestVirtualBufferCache)) { + vbcs.remove(i); + vbcs.add(i, new TestVirtualBufferCache(vbc)); + } + } ioOpCallbackFactory.initialize(serviceCtx); return TestLsmBtreeUtil.createLSMTree(ioManager, vbcs, file, storageManager.getBufferCache(serviceCtx), typeTraits, cmpFactories, bloomFilterKeyFields, bloomFilterFalsePositiveRate, http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c5a0a197/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeSearchCursor.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeSearchCursor.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeSearchCursor.java index d3504ae..45e39aa 100644 --- a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeSearchCursor.java +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestLsmBtreeSearchCursor.java @@ -19,6 +19,7 @@ package org.apache.hyracks.storage.am.lsm.btree.impl; import java.util.List; +import java.util.concurrent.Semaphore; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTreeSearchCursor; @@ -36,9 +37,9 @@ public class TestLsmBtreeSearchCursor extends LSMBTreeSearchCursor { @Override public void next() throws HyracksDataException { try { - List<ITestOpCallback> callbacks = lsmBtree.getSearchCallbacks(); + List<ITestOpCallback<Semaphore>> callbacks = lsmBtree.getSearchCallbacks(); synchronized (callbacks) { - for (ITestOpCallback cb : callbacks) { + for (ITestOpCallback<Semaphore> cb : callbacks) { TestLsmBtree.callback(cb, lsmBtree.getSearchSemaphore()); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c5a0a197/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestVirtualBufferCache.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestVirtualBufferCache.java b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestVirtualBufferCache.java new file mode 100644 index 0000000..c7e064f --- /dev/null +++ b/hyracks-fullstack/hyracks/hyracks-tests/hyracks-storage-am-lsm-btree-test/src/test/java/org/apache/hyracks/storage/am/lsm/btree/impl/TestVirtualBufferCache.java @@ -0,0 +1,215 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.storage.am.lsm.btree.impl; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.io.FileReference; +import org.apache.hyracks.api.replication.IIOReplicationManager; +import org.apache.hyracks.storage.am.lsm.common.api.IVirtualBufferCache; +import org.apache.hyracks.storage.common.buffercache.ICachedPage; +import org.apache.hyracks.storage.common.buffercache.IExtraPageBlockHelper; +import org.apache.hyracks.storage.common.buffercache.IFIFOPageQueue; +import org.apache.hyracks.storage.common.file.IFileMapManager; + +public class TestVirtualBufferCache implements IVirtualBufferCache { + private final IVirtualBufferCache vbc; + private final AtomicBoolean isFull = new AtomicBoolean(false); + private final List<IVirtualBufferCacheCallback> callbacks; + + public TestVirtualBufferCache(IVirtualBufferCache vbc) { + this.vbc = vbc; + callbacks = new ArrayList<>(); + } + + public void addCallback(IVirtualBufferCacheCallback callback) { + synchronized (callbacks) { + callbacks.add(callback); + } + } + + public void clearCallbacks() { + synchronized (callbacks) { + callbacks.clear(); + } + } + + @Override + public int createFile(FileReference fileRef) throws HyracksDataException { + return vbc.createFile(fileRef); + } + + @Override + public int openFile(FileReference fileRef) throws HyracksDataException { + return vbc.openFile(fileRef); + } + + @Override + public void openFile(int fileId) throws HyracksDataException { + vbc.openFile(fileId); + } + + @Override + public void closeFile(int fileId) throws HyracksDataException { + vbc.closeFile(fileId); + } + + @Override + public void deleteFile(int fileId) throws HyracksDataException { + vbc.deleteFile(fileId); + } + + @Override + public void deleteFile(FileReference file) throws HyracksDataException { + vbc.deleteFile(file); + } + + @Override + public ICachedPage pin(long dpid, boolean newPage) throws HyracksDataException { + return vbc.pin(dpid, newPage); + } + + @Override + public void unpin(ICachedPage page) throws HyracksDataException { + vbc.unpin(page); + } + + @Override + public void flush(ICachedPage page) throws HyracksDataException { + vbc.flush(page); + } + + @Override + public void force(int fileId, boolean metadata) throws HyracksDataException { + vbc.force(fileId, metadata); + } + + @Override + public ICachedPage confiscatePage(long dpid) throws HyracksDataException { + return vbc.confiscatePage(dpid); + } + + @Override + public ICachedPage confiscateLargePage(long dpid, int multiplier, int extraBlockPageId) + throws HyracksDataException { + return vbc.confiscateLargePage(dpid, multiplier, extraBlockPageId); + } + + @Override + public void returnPage(ICachedPage page) { + vbc.returnPage(page); + } + + @Override + public void returnPage(ICachedPage page, boolean reinsert) { + vbc.returnPage(page, reinsert); + } + + @Override + public int getPageSize() { + return vbc.getPageSize(); + } + + @Override + public int getPageSizeWithHeader() { + return vbc.getPageSizeWithHeader(); + } + + @Override + public int getPageBudget() { + return vbc.getPageBudget(); + } + + @Override + public int getNumPagesOfFile(int fileId) throws HyracksDataException { + return vbc.getNumPagesOfFile(fileId); + } + + @Override + public int getFileReferenceCount(int fileId) { + return vbc.getFileReferenceCount(fileId); + } + + @Override + public void close() throws HyracksDataException { + vbc.close(); + } + + @Override + public IFIFOPageQueue createFIFOQueue() { + return vbc.createFIFOQueue(); + } + + @Override + public void finishQueue() throws HyracksDataException { + vbc.finishQueue(); + } + + @Override + public boolean isReplicationEnabled() { + return vbc.isReplicationEnabled(); + } + + @Override + public IIOReplicationManager getIOReplicationManager() { + return vbc.getIOReplicationManager(); + } + + @Override + public void purgeHandle(int fileId) throws HyracksDataException { + vbc.purgeHandle(fileId); + } + + @Override + public void resizePage(ICachedPage page, int multiplier, IExtraPageBlockHelper extraPageBlockHelper) + throws HyracksDataException { + vbc.resizePage(page, multiplier, extraPageBlockHelper); + } + + @Override + public void open() throws HyracksDataException { + vbc.open(); + } + + @Override + public boolean isFull() { + boolean newValue = vbc.isFull(); + if (isFull.compareAndSet(!newValue, newValue)) { + synchronized (callbacks) { + for (int i = 0; i < callbacks.size(); i++) { + callbacks.get(i).isFullChanged(newValue); + } + } + } + return newValue; + } + + @Override + public void reset() { + vbc.reset(); + } + + @Override + public IFileMapManager getFileMapProvider() { + return vbc.getFileMapProvider(); + } +}