[NO ISSUE][STO] Introduce Index Checkpoints
- user model changes: no
- storage format changes: yes
- Add index checkpoints.
- Use index checkpoint to determine low watermark
during recovery.
- interface changes: yes
- Introduce IIndexCheckpointManager for managing
indexes checkpoints.
- Introduce IIndexCheckpointProvider for tracking
IIndexCheckpointManager references.
Details:
- Unify LSM flush/merge operations completion order.
- Introduce index checkpoints which contains:
- Index low watermark.
- Latest valid LSM component
- Mapping between master replica and local replica.
- Use index checkpoints instead of LSM component metadata
for identifying low watermark in recovery.
- Use index checkpoints in replication instead of overwriting
LSN byte offset in replica component metadata.
- Replace LSN_MAP used in replication by index checkpoints.
- Replace NIO Files.find by Commons FileUtils.listFiles to
avoid no NoSuchFileException on any file deletion.
Change-Id: Ib22800002bf8ea3660242e599b3f5f20678301a8
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2200
Sonar-Qube: Jenkins <[email protected]>
Reviewed-by: abdullah alamoudi <[email protected]>
Tested-by: Jenkins <[email protected]>
Contrib: Jenkins <[email protected]>
Integration-Tests: Jenkins <[email protected]>
Reviewed-by: Till Westmann <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/929344e9
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/929344e9
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/929344e9
Branch: refs/heads/master
Commit: 929344e93049ea133d07fea4d7ff5c9c8e8ed46c
Parents: 3180d87
Author: Murtadha Hubail <[email protected]>
Authored: Thu Dec 7 23:25:41 2017 +0300
Committer: Murtadha Hubail <[email protected]>
Committed: Thu Dec 7 15:08:42 2017 -0800
----------------------------------------------------------------------
.../asterix/app/nc/IndexCheckpointManager.java | 192 +++++++++++++++++++
.../app/nc/IndexCheckpointManagerProvider.java | 66 +++++++
.../asterix/app/nc/NCAppRuntimeContext.java | 21 +-
.../apache/asterix/app/nc/RecoveryManager.java | 14 +-
.../TestLsmBtreeIoOpCallbackFactory.java | 12 +-
asterixdb/asterix-common/pom.xml | 12 --
.../common/api/INcApplicationContext.java | 4 +
.../common/context/BaseOperationTracker.java | 7 +-
.../common/context/DatasetLifecycleManager.java | 33 ++--
.../asterix/common/context/DatasetResource.java | 2 +-
.../asterix/common/context/IndexInfo.java | 11 +-
.../context/PrimaryIndexOperationTracker.java | 8 +-
.../asterix/common/dataflow/LSMIndexUtil.java | 7 -
.../AbstractLSMIOOperationCallback.java | 97 ++++++----
...tractLSMIndexIOOperationCallbackFactory.java | 6 +
.../LSMBTreeIOOperationCallback.java | 22 +--
.../LSMBTreeIOOperationCallbackFactory.java | 2 +-
.../LSMBTreeWithBuddyIOOperationCallback.java | 22 +--
...TreeWithBuddyIOOperationCallbackFactory.java | 3 +-
.../LSMInvertedIndexIOOperationCallback.java | 23 +--
...InvertedIndexIOOperationCallbackFactory.java | 3 +-
.../LSMRTreeIOOperationCallback.java | 22 +--
.../LSMRTreeIOOperationCallbackFactory.java | 2 +-
.../storage/DatasetResourceReference.java | 28 +++
.../common/storage/IIndexCheckpointManager.java | 93 +++++++++
.../IIndexCheckpointManagerProvider.java | 40 ++++
.../asterix/common/storage/IndexCheckpoint.java | 98 ++++++++++
.../common/storage/ResourceReference.java | 22 +++
.../asterix/common/utils/StorageConstants.java | 7 +
.../CorrelatedPrefixMergePolicyTest.java | 5 +-
.../AbstractLSMIOOperationCallbackTest.java | 72 ++++---
.../LSMBTreeIOOperationCallbackTest.java | 6 +-
...SMBTreeWithBuddyIOOperationCallbackTest.java | 6 +-
...LSMInvertedIndexIOOperationCallbackTest.java | 6 +-
.../LSMRTreeIOOperationCallbackTest.java | 6 +-
.../installer/test/AsterixLifecycleIT.java | 105 ----------
.../replication/logging/RemoteLogMapping.java | 4 -
.../management/ReplicationChannel.java | 188 +++++++++---------
.../management/ReplicationManager.java | 49 ++---
.../storage/LSMComponentLSNSyncTask.java | 16 +-
.../storage/LSMComponentProperties.java | 5 +-
.../storage/LSMIndexFileProperties.java | 21 +-
.../storage/ReplicaResourcesManager.java | 118 +++---------
.../PersistentLocalResourceRepository.java | 80 +++++---
...ersistentLocalResourceRepositoryFactory.java | 8 +-
.../management/service/logging/LogBuffer.java | 31 +--
.../impls/AbstractLSMIndexFileManager.java | 4 +
.../storage/am/lsm/common/impls/LSMHarness.java | 92 +++++----
48 files changed, 1043 insertions(+), 658 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
----------------------------------------------------------------------
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
new file mode 100644
index 0000000..446d04d
--- /dev/null
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.nc;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import org.apache.asterix.common.storage.IIndexCheckpointManager;
+import org.apache.asterix.common.storage.IndexCheckpoint;
+import org.apache.asterix.common.utils.StorageConstants;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.util.annotations.ThreadSafe;
+
+@ThreadSafe
+public class IndexCheckpointManager implements IIndexCheckpointManager {
+
+ private static final Logger LOGGER =
Logger.getLogger(IndexCheckpointManager.class.getName());
+ private static final int HISTORY_CHECKPOINTS = 1;
+ private static final int MAX_CHECKPOINT_WRITE_ATTEMPTS = 5;
+ private static final FilenameFilter CHECKPOINT_FILE_FILTER =
+ (file, name) ->
name.startsWith(StorageConstants.INDEX_CHECKPOINT_FILE_PREFIX);
+ private static final long BULKLOAD_LSN = 0;
+ private final Path indexPath;
+
+ public IndexCheckpointManager(Path indexPath) {
+ this.indexPath = indexPath;
+ }
+
+ @Override
+ public synchronized void init(long lsn) throws HyracksDataException {
+ final List<IndexCheckpoint> checkpoints = getCheckpoints();
+ if (!checkpoints.isEmpty()) {
+ LOGGER.warning(() -> "Checkpoints found on initializing: " +
indexPath);
+ delete();
+ }
+ IndexCheckpoint firstCheckpoint = IndexCheckpoint.first(lsn);
+ persist(firstCheckpoint);
+ }
+
+ @Override
+ public synchronized void replicated(String componentTimestamp, long
masterLsn) throws HyracksDataException {
+ final Long localLsn =
getLatest().getMasterNodeFlushMap().get(masterLsn);
+ if (localLsn == null) {
+ throw new IllegalStateException("Component flushed before lsn
mapping was received");
+ }
+ flushed(componentTimestamp, localLsn);
+ }
+
+ @Override
+ public synchronized void flushed(String componentTimestamp, long lsn)
throws HyracksDataException {
+ final IndexCheckpoint latest = getLatest();
+ IndexCheckpoint nextCheckpoint = IndexCheckpoint.next(latest, lsn,
componentTimestamp);
+ persist(nextCheckpoint);
+ deleteHistory(nextCheckpoint.getId(), HISTORY_CHECKPOINTS);
+ }
+
+ @Override
+ public synchronized void masterFlush(long masterLsn, long localLsn) throws
HyracksDataException {
+ final IndexCheckpoint latest = getLatest();
+ latest.getMasterNodeFlushMap().put(masterLsn, localLsn);
+ final IndexCheckpoint next =
+ IndexCheckpoint.next(latest, latest.getLowWatermark(),
latest.getValidComponentTimestamp());
+ persist(next);
+ notifyAll();
+ }
+
+ @Override
+ public synchronized long getLowWatermark() throws HyracksDataException {
+ return getLatest().getLowWatermark();
+ }
+
+ @Override
+ public synchronized boolean isFlushed(long masterLsn) throws
HyracksDataException {
+ if (masterLsn == BULKLOAD_LSN) {
+ return true;
+ }
+ return getLatest().getMasterNodeFlushMap().containsKey(masterLsn);
+ }
+
+ @Override
+ public synchronized void advanceLowWatermark(long lsn) throws
HyracksDataException {
+ flushed(getLatest().getValidComponentTimestamp(), lsn);
+ }
+
+ @Override
+ public synchronized void delete() {
+ deleteHistory(Long.MAX_VALUE, 0);
+ }
+
+ private IndexCheckpoint getLatest() {
+ final List<IndexCheckpoint> checkpoints = getCheckpoints();
+ if (checkpoints.isEmpty()) {
+ throw new IllegalStateException("Couldn't find any checkpoints for
resource: " + indexPath);
+ }
+
checkpoints.sort(Comparator.comparingLong(IndexCheckpoint::getId).reversed());
+ return checkpoints.get(0);
+ }
+
+ private List<IndexCheckpoint> getCheckpoints() {
+ List<IndexCheckpoint> checkpoints = new ArrayList<>();
+ final File[] checkpointFiles =
indexPath.toFile().listFiles(CHECKPOINT_FILE_FILTER);
+ if (checkpointFiles != null) {
+ for (File checkpointFile : checkpointFiles) {
+ try {
+ checkpoints.add(read(checkpointFile.toPath()));
+ } catch (IOException e) {
+ LOGGER.log(Level.WARNING, e, () -> "Couldn't read index
checkpoint file: " + e);
+ }
+ }
+ }
+ return checkpoints;
+ }
+
+ private void persist(IndexCheckpoint checkpoint) throws
HyracksDataException {
+ final Path checkpointPath = getCheckpointPath(checkpoint);
+ for (int i = 1; i <= MAX_CHECKPOINT_WRITE_ATTEMPTS; i++) {
+ try {
+ // clean up from previous write failure
+ if (checkpointPath.toFile().exists()) {
+ Files.delete(checkpointPath);
+ }
+ try (BufferedWriter writer =
Files.newBufferedWriter(checkpointPath)) {
+ writer.write(checkpoint.asJson());
+ }
+ // ensure it was written correctly by reading it
+ read(checkpointPath);
+ } catch (IOException e) {
+ if (i == MAX_CHECKPOINT_WRITE_ATTEMPTS) {
+ throw HyracksDataException.create(e);
+ }
+ LOGGER.log(Level.WARNING, e, () -> "Filed to write checkpoint
at: " + indexPath);
+ int nextAttempt = i + 1;
+ LOGGER.info(() -> "Checkpoint write attempt " + nextAttempt +
"/" + MAX_CHECKPOINT_WRITE_ATTEMPTS);
+ }
+ }
+ }
+
+ private IndexCheckpoint read(Path checkpointPath) throws IOException {
+ return IndexCheckpoint.fromJson(new
String(Files.readAllBytes(checkpointPath)));
+ }
+
+ private void deleteHistory(long latestId, int historyToKeep) {
+ try {
+ final File[] checkpointFiles =
indexPath.toFile().listFiles(CHECKPOINT_FILE_FILTER);
+ if (checkpointFiles != null) {
+ for (File checkpointFile : checkpointFiles) {
+ if (getCheckpointIdFromFileName(checkpointFile.toPath()) <
(latestId - historyToKeep)) {
+ Files.delete(checkpointFile.toPath());
+ }
+ }
+ }
+ } catch (Exception e) {
+ LOGGER.log(Level.WARNING, e, () -> "Couldn't delete history
checkpoints at " + indexPath);
+ }
+ }
+
+ private Path getCheckpointPath(IndexCheckpoint checkpoint) {
+ return Paths.get(indexPath.toString(),
+ StorageConstants.INDEX_CHECKPOINT_FILE_PREFIX +
String.valueOf(checkpoint.getId()));
+ }
+
+ private long getCheckpointIdFromFileName(Path checkpointPath) {
+ return Long.valueOf(checkpointPath.getFileName().toString()
+
.substring(StorageConstants.INDEX_CHECKPOINT_FILE_PREFIX.length()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManagerProvider.java
----------------------------------------------------------------------
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManagerProvider.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManagerProvider.java
new file mode 100644
index 0000000..19ad8f6
--- /dev/null
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManagerProvider.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.nc;
+
+import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.asterix.common.storage.IIndexCheckpointManager;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
+import org.apache.asterix.common.storage.ResourceReference;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.io.IIOManager;
+
+public class IndexCheckpointManagerProvider implements
IIndexCheckpointManagerProvider {
+
+ private final Map<ResourceReference, IndexCheckpointManager>
indexCheckpointManagerMap = new HashMap<>();
+ private final IIOManager ioManager;
+
+ public IndexCheckpointManagerProvider(IIOManager ioManager) {
+ this.ioManager = ioManager;
+ }
+
+ @Override
+ public IIndexCheckpointManager get(ResourceReference ref) throws
HyracksDataException {
+ synchronized (indexCheckpointManagerMap) {
+ return indexCheckpointManagerMap.computeIfAbsent(ref,
this::create);
+ }
+ }
+
+ @Override
+ public void close(ResourceReference ref) {
+ synchronized (indexCheckpointManagerMap) {
+ indexCheckpointManagerMap.remove(ref);
+ }
+ }
+
+ private IndexCheckpointManager create(ResourceReference ref) {
+ try {
+ final Path indexPath = getIndexPath(ref);
+ return new IndexCheckpointManager(indexPath);
+ } catch (HyracksDataException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ private Path getIndexPath(ResourceReference indexRef) throws
HyracksDataException {
+ return
ioManager.resolve(indexRef.getRelativePath().toString()).getFile().toPath();
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
----------------------------------------------------------------------
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index 5cae2d6..b6bf2df 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -60,6 +60,7 @@ import
org.apache.asterix.common.replication.IRemoteRecoveryManager;
import org.apache.asterix.common.replication.IReplicaResourcesManager;
import org.apache.asterix.common.replication.IReplicationChannel;
import org.apache.asterix.common.replication.IReplicationManager;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
import org.apache.asterix.common.storage.IStorageSubsystem;
import org.apache.asterix.common.transactions.IAppRuntimeContextProvider;
import org.apache.asterix.common.transactions.IRecoveryManager;
@@ -142,6 +143,7 @@ public class NCAppRuntimeContext implements
INcApplicationContext {
private final IStorageComponentProvider componentProvider;
private IHyracksClientConnection hcc;
private IStorageSubsystem storageSubsystem;
+ private IIndexCheckpointManagerProvider indexCheckpointManagerProvider;
public NCAppRuntimeContext(INCServiceContext ncServiceContext,
List<AsterixExtension> extensions)
throws AsterixException, InstantiationException,
IllegalAccessException, ClassNotFoundException,
@@ -182,11 +184,11 @@ public class NCAppRuntimeContext implements
INcApplicationContext {
lsmIOScheduler = AsynchronousScheduler.INSTANCE;
metadataMergePolicyFactory = new PrefixMergePolicyFactory();
+ indexCheckpointManagerProvider = new
IndexCheckpointManagerProvider(ioManager);
ILocalResourceRepositoryFactory
persistentLocalResourceRepositoryFactory =
new PersistentLocalResourceRepositoryFactory(ioManager,
getServiceContext().getNodeId(),
- metadataProperties);
-
+ metadataProperties, indexCheckpointManagerProvider);
localResourceRepository =
(PersistentLocalResourceRepository)
persistentLocalResourceRepositoryFactory.createRepository();
@@ -203,11 +205,10 @@ public class NCAppRuntimeContext implements
INcApplicationContext {
}
localResourceRepository.deleteStorageData();
}
-
datasetMemoryManager = new DatasetMemoryManager(storageProperties);
datasetLifecycleManager =
new DatasetLifecycleManager(storageProperties,
localResourceRepository, txnSubsystem.getLogManager(),
- datasetMemoryManager, ioManager.getIODevices().size());
+ datasetMemoryManager, indexCheckpointManagerProvider,
ioManager.getIODevices().size());
final String nodeId = getServiceContext().getNodeId();
final ClusterPartition[] nodePartitions =
metadataProperties.getNodePartitions().get(nodeId);
final Set<Integer> nodePartitionsIds =
Arrays.stream(nodePartitions).map(ClusterPartition::getPartitionId)
@@ -220,7 +221,8 @@ public class NCAppRuntimeContext implements
INcApplicationContext {
if
(replicationProperties.isParticipant(getServiceContext().getNodeId())) {
- replicaResourcesManager = new
ReplicaResourcesManager(localResourceRepository, metadataProperties);
+ replicaResourcesManager = new
ReplicaResourcesManager(localResourceRepository, metadataProperties,
+ indexCheckpointManagerProvider);
replicationManager = new ReplicationManager(nodeId,
replicationProperties, replicaResourcesManager,
txnSubsystem.getLogManager(),
asterixAppRuntimeContextProvider);
@@ -229,13 +231,13 @@ public class NCAppRuntimeContext implements
INcApplicationContext {
//LogManager to replicate logs
txnSubsystem.getLogManager().setReplicationManager(replicationManager);
- //PersistentLocalResourceRepository to replicate metadata files
and delete backups on drop index
+ //PersistentLocalResourceRepository to replicated metadata files
and delete backups on drop index
localResourceRepository.setReplicationManager(replicationManager);
/*
* add the partitions that will be replicated in this node as
inactive partitions
*/
- //get nodes which replicate to this node
+ //get nodes which replicated to this node
Set<String> remotePrimaryReplicas =
replicationProperties.getRemotePrimaryReplicasIds(nodeId);
for (String clientId : remotePrimaryReplicas) {
//get the partitions of each client
@@ -529,4 +531,9 @@ public class NCAppRuntimeContext implements
INcApplicationContext {
public IStorageSubsystem getStorageSubsystem() {
return storageSubsystem;
}
+
+ @Override
+ public IIndexCheckpointManagerProvider getIndexCheckpointManagerProvider()
{
+ return indexCheckpointManagerProvider;
+ }
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
----------------------------------------------------------------------
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
index e29e3fe..f0ed5e9 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
@@ -43,11 +43,14 @@ import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.asterix.common.api.IDatasetLifecycleManager;
+import org.apache.asterix.common.api.INcApplicationContext;
import org.apache.asterix.common.config.ReplicationProperties;
import org.apache.asterix.common.dataflow.DatasetLocalResource;
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
import org.apache.asterix.common.replication.IReplicaResourcesManager;
+import org.apache.asterix.common.storage.DatasetResourceReference;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
import org.apache.asterix.common.transactions.Checkpoint;
import org.apache.asterix.common.transactions.IAppRuntimeContextProvider;
import org.apache.asterix.common.transactions.ICheckpointManager;
@@ -293,6 +296,8 @@ public class RecoveryManager implements IRecoveryManager,
ILifeCycleComponent {
IAppRuntimeContextProvider appRuntimeContext =
txnSubsystem.getAsterixAppRuntimeContextProvider();
IDatasetLifecycleManager datasetLifecycleManager =
appRuntimeContext.getDatasetLifecycleManager();
+ final IIndexCheckpointManagerProvider indexCheckpointManagerProvider =
+ ((INcApplicationContext)
(serviceCtx.getApplicationContext())).getIndexCheckpointManagerProvider();
Map<Long, LocalResource> resourcesMap =
localResourceRepository.loadAndGetAllResources();
Map<Long, Long> resourceId2MaxLSNMap = new HashMap<>();
@@ -356,18 +361,15 @@ public class RecoveryManager implements IRecoveryManager,
ILifeCycleComponent {
index = (ILSMIndex)
localResourceMetadata.createInstance(serviceCtx);
datasetLifecycleManager.register(localResource.getPath(), index);
datasetLifecycleManager.open(localResource.getPath());
-
- //#. get maxDiskLastLSN
- ILSMIndex lsmIndex = index;
try {
+ final DatasetResourceReference
resourceReference =
+
DatasetResourceReference.of(localResource);
maxDiskLastLsn =
-
((AbstractLSMIOOperationCallback) lsmIndex.getIOOperationCallback())
-
.getComponentLSN(lsmIndex.getDiskComponents());
+
indexCheckpointManagerProvider.get(resourceReference).getLowWatermark();
} catch (HyracksDataException e) {
datasetLifecycleManager.close(localResource.getPath());
throw e;
}
-
//#. set resourceId and maxDiskLastLSN to
the map
resourceId2MaxLSNMap.put(resourceId,
maxDiskLastLsn);
} else {
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java
----------------------------------------------------------------------
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java
index fea6cd8..4bfc581 100644
---
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/dataflow/TestLsmBtreeIoOpCallbackFactory.java
@@ -22,6 +22,8 @@ import java.util.List;
import org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallback;
import
org.apache.asterix.common.ioopcallbacks.LSMBTreeIOOperationCallbackFactory;
+import org.apache.asterix.common.storage.IIndexCheckpointManager;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.storage.am.lsm.btree.impl.TestLsmBtree;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponent;
@@ -58,7 +60,7 @@ public class TestLsmBtreeIoOpCallbackFactory extends
LSMBTreeIOOperationCallback
// Whenever this is called, it resets the counter
// However, the counters for the failed operations are never reset
since we expect them
// To be always 0
- return new TestLsmBtreeIoOpCallback(index, getComponentIdGenerator());
+ return new TestLsmBtreeIoOpCallback(index, getComponentIdGenerator(),
getIndexCheckpointManagerProvider());
}
public int getTotalFlushes() {
@@ -100,8 +102,9 @@ public class TestLsmBtreeIoOpCallbackFactory extends
LSMBTreeIOOperationCallback
public class TestLsmBtreeIoOpCallback extends LSMBTreeIOOperationCallback {
private final TestLsmBtree lsmBtree;
- public TestLsmBtreeIoOpCallback(ILSMIndex index,
ILSMComponentIdGenerator idGenerator) {
- super(index, idGenerator);
+ public TestLsmBtreeIoOpCallback(ILSMIndex index,
ILSMComponentIdGenerator idGenerator,
+ IIndexCheckpointManagerProvider checkpointManagerProvider) {
+ super(index, idGenerator, checkpointManagerProvider);
lsmBtree = (TestLsmBtree) index;
}
@@ -121,7 +124,8 @@ public class TestLsmBtreeIoOpCallbackFactory extends
LSMBTreeIOOperationCallback
}
@Override
- public void afterFinalize(LSMIOOperationType opType, ILSMDiskComponent
newComponent) {
+ public void afterFinalize(LSMIOOperationType opType, ILSMDiskComponent
newComponent)
+ throws HyracksDataException {
lsmBtree.afterIoFinalizeCalled();
super.afterFinalize(opType, newComponent);
synchronized (TestLsmBtreeIoOpCallbackFactory.this) {
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/asterixdb/asterix-common/pom.xml
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-common/pom.xml b/asterixdb/asterix-common/pom.xml
index ad83d60..b909e91 100644
--- a/asterixdb/asterix-common/pom.xml
+++ b/asterixdb/asterix-common/pom.xml
@@ -225,18 +225,10 @@
</dependency>
<dependency>
<groupId>org.apache.hyracks</groupId>
- <artifactId>hyracks-storage-am-btree</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.hyracks</groupId>
<artifactId>hyracks-storage-am-bloomfilter</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hyracks</groupId>
- <artifactId>hyracks-storage-am-rtree</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.hyracks</groupId>
<artifactId>hyracks-api</artifactId>
</dependency>
<dependency>
@@ -245,10 +237,6 @@
</dependency>
<dependency>
<groupId>org.apache.hyracks</groupId>
- <artifactId>hyracks-storage-am-lsm-invertedindex</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.hyracks</groupId>
<artifactId>hyracks-storage-am-lsm-rtree</artifactId>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
----------------------------------------------------------------------
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
index 162e693..0503c09 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
@@ -28,7 +28,9 @@ import
org.apache.asterix.common.replication.IRemoteRecoveryManager;
import org.apache.asterix.common.replication.IReplicaResourcesManager;
import org.apache.asterix.common.replication.IReplicationChannel;
import org.apache.asterix.common.replication.IReplicationManager;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
import org.apache.asterix.common.storage.IStorageSubsystem;
+import org.apache.asterix.common.storage.IIndexCheckpointManager;
import org.apache.asterix.common.transactions.ITransactionSubsystem;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.application.INCServiceContext;
@@ -118,4 +120,6 @@ public interface INcApplicationContext extends
IApplicationContext {
INCServiceContext getServiceContext();
IStorageSubsystem getStorageSubsystem();
+
+ IIndexCheckpointManagerProvider getIndexCheckpointManagerProvider();
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java
----------------------------------------------------------------------
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java
index 9f57981..9ec13ef 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/BaseOperationTracker.java
@@ -46,7 +46,7 @@ public class BaseOperationTracker implements
ITransactionOperationTracker {
@Override
public void afterOperation(ILSMIndex index, LSMOperationType opType,
ISearchOperationCallback searchCallback,
IModificationOperationCallback modificationCallback) throws
HyracksDataException {
- if (opType == LSMOperationType.FLUSH || opType ==
LSMOperationType.REPLICATE) {
+ if (opType == LSMOperationType.REPLICATE) {
dsInfo.undeclareActiveIOOperation();
}
}
@@ -54,14 +54,11 @@ public class BaseOperationTracker implements
ITransactionOperationTracker {
@Override
public void completeOperation(ILSMIndex index, LSMOperationType opType,
ISearchOperationCallback searchCallback,
IModificationOperationCallback modificationCallback) throws
HyracksDataException {
- if (opType == LSMOperationType.MERGE) {
+ if (opType == LSMOperationType.FLUSH || opType ==
LSMOperationType.MERGE) {
dsInfo.undeclareActiveIOOperation();
}
}
- public void exclusiveJobCommitted() throws HyracksDataException {
- }
-
@Override
public void beforeTransaction(long resourceId) {
/*
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
----------------------------------------------------------------------
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
index ce43bca..6a1ebfb 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetLifecycleManager.java
@@ -35,6 +35,8 @@ import
org.apache.asterix.common.dataflow.DatasetLocalResource;
import org.apache.asterix.common.exceptions.ACIDException;
import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
import org.apache.asterix.common.replication.IReplicationStrategy;
+import org.apache.asterix.common.storage.DatasetResourceReference;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
import org.apache.asterix.common.transactions.ILogManager;
import org.apache.asterix.common.transactions.LogRecord;
import org.apache.asterix.common.utils.StoragePathUtil;
@@ -64,13 +66,16 @@ public class DatasetLifecycleManager implements
IDatasetLifecycleManager, ILifeC
private final LogRecord logRecord;
private final int numPartitions;
private volatile boolean stopped = false;
+ private final IIndexCheckpointManagerProvider
indexCheckpointManagerProvider;
public DatasetLifecycleManager(StorageProperties storageProperties,
ILocalResourceRepository resourceRepository,
- ILogManager logManager, IDatasetMemoryManager memoryManager, int
numPartitions) {
+ ILogManager logManager, IDatasetMemoryManager memoryManager,
+ IIndexCheckpointManagerProvider indexCheckpointManagerProvider,
int numPartitions) {
this.logManager = logManager;
this.storageProperties = storageProperties;
this.resourceRepository = resourceRepository;
this.memoryManager = memoryManager;
+ this.indexCheckpointManagerProvider = indexCheckpointManagerProvider;
this.numPartitions = numPartitions;
logRecord = new LogRecord();
}
@@ -149,12 +154,7 @@ public class DatasetLifecycleManager implements
IDatasetLifecycleManager, ILifeC
// TODO: use fine-grained counters, one for each index instead of a
single counter per dataset.
DatasetInfo dsInfo = dsr.getDatasetInfo();
dsInfo.waitForIO();
- if (iInfo.isOpen()) {
- ILSMOperationTracker indexOpTracker =
iInfo.getIndex().getOperationTracker();
- synchronized (indexOpTracker) {
- iInfo.getIndex().deactivate(false);
- }
- }
+ closeIndex(iInfo);
dsInfo.getIndexes().remove(resourceID);
if (dsInfo.getReferenceCount() == 0 && dsInfo.isOpen() &&
dsInfo.getIndexes().isEmpty()
&& !dsInfo.isExternal()) {
@@ -451,13 +451,7 @@ public class DatasetLifecycleManager implements
IDatasetLifecycleManager, ILifeC
throw HyracksDataException.create(e);
}
for (IndexInfo iInfo : dsInfo.getIndexes().values()) {
- if (iInfo.isOpen()) {
- ILSMOperationTracker opTracker =
iInfo.getIndex().getOperationTracker();
- synchronized (opTracker) {
- iInfo.getIndex().deactivate(false);
- }
- iInfo.setOpen(false);
- }
+ closeIndex(iInfo);
}
removeDatasetFromCache(dsInfo.getDatasetID());
dsInfo.setOpen(false);
@@ -579,4 +573,15 @@ public class DatasetLifecycleManager implements
IDatasetLifecycleManager, ILifeC
}
}
}
+
+ private void closeIndex(IndexInfo indexInfo) throws HyracksDataException {
+ if (indexInfo.isOpen()) {
+ ILSMOperationTracker opTracker =
indexInfo.getIndex().getOperationTracker();
+ synchronized (opTracker) {
+ indexInfo.getIndex().deactivate(false);
+ }
+
indexCheckpointManagerProvider.close(DatasetResourceReference.of(indexInfo.getLocalResource()));
+ indexInfo.setOpen(false);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
----------------------------------------------------------------------
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
index f6e2b0d..c02de7e 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/DatasetResource.java
@@ -108,7 +108,7 @@ public class DatasetResource implements
Comparable<DatasetResource> {
if (index == null) {
throw new HyracksDataException("Attempt to register a null index");
}
- datasetInfo.getIndexes().put(resourceID, new IndexInfo(index,
datasetInfo.getDatasetID(), resourceID,
+ datasetInfo.getIndexes().put(resourceID, new IndexInfo(index,
datasetInfo.getDatasetID(), resource,
((DatasetLocalResource)
resource.getResource()).getPartition()));
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/IndexInfo.java
----------------------------------------------------------------------
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/IndexInfo.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/IndexInfo.java
index 9eb5b6c..b094b6f 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/IndexInfo.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/IndexInfo.java
@@ -19,17 +19,20 @@
package org.apache.asterix.common.context;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
+import org.apache.hyracks.storage.common.LocalResource;
public class IndexInfo extends Info {
private final ILSMIndex index;
private final int datasetId;
private final long resourceId;
private final int partition;
+ private final LocalResource localResource;
- public IndexInfo(ILSMIndex index, int datasetId, long resourceId, int
partition) {
+ public IndexInfo(ILSMIndex index, int datasetId, LocalResource
localResource, int partition) {
this.index = index;
this.datasetId = datasetId;
- this.resourceId = resourceId;
+ this.localResource = localResource;
+ this.resourceId = localResource.getId();
this.partition = partition;
}
@@ -48,4 +51,8 @@ public class IndexInfo extends Info {
public int getDatasetId() {
return datasetId;
}
+
+ public LocalResource getLocalResource() {
+ return localResource;
+ }
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
----------------------------------------------------------------------
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
index ababe9c..14e91ba 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/context/PrimaryIndexOperationTracker.java
@@ -73,7 +73,7 @@ public class PrimaryIndexOperationTracker extends
BaseOperationTracker {
public void afterOperation(ILSMIndex index, LSMOperationType opType,
ISearchOperationCallback searchCallback,
IModificationOperationCallback modificationCallback) throws
HyracksDataException {
// Searches are immediately considered complete, because they should
not prevent the execution of flushes.
- if (opType == LSMOperationType.FLUSH || opType ==
LSMOperationType.REPLICATE) {
+ if (opType == LSMOperationType.REPLICATE) {
completeOperation(index, opType, searchCallback,
modificationCallback);
}
}
@@ -160,12 +160,6 @@ public class PrimaryIndexOperationTracker extends
BaseOperationTracker {
flushLogCreated = false;
}
- @Override
- public void exclusiveJobCommitted() throws HyracksDataException {
- numActiveOperations.set(0);
- flushIfRequested();
- }
-
public int getNumActiveOperations() {
return numActiveOperations.get();
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMIndexUtil.java
----------------------------------------------------------------------
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMIndexUtil.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMIndexUtil.java
index 04090bb..e844192 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMIndexUtil.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/dataflow/LSMIndexUtil.java
@@ -41,11 +41,4 @@ public class LSMIndexUtil {
}
}
}
-
- public static long getComponentFileLSNOffset(ILSMIndex lsmIndex,
ILSMDiskComponent lsmComponent,
- String componentFilePath) throws HyracksDataException {
- AbstractLSMIOOperationCallback ioOpCallback =
- (AbstractLSMIOOperationCallback)
lsmIndex.getIOOperationCallback();
- return ioOpCallback.getComponentFileLSNOffset(lsmComponent,
componentFilePath);
- }
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
----------------------------------------------------------------------
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
index 1432f25..c625988 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIOOperationCallback.java
@@ -19,8 +19,13 @@
package org.apache.asterix.common.ioopcallbacks;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
+import org.apache.asterix.common.storage.ResourceReference;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.data.std.primitive.LongPointable;
import org.apache.hyracks.storage.am.common.api.IMetadataPageManager;
@@ -33,6 +38,7 @@ import
org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperation.LSMIOOperati
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationCallback;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMemoryComponent;
+import
org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
import org.apache.hyracks.storage.am.lsm.common.impls.DiskComponentMetadata;
import org.apache.hyracks.storage.am.lsm.common.util.ComponentUtils;
import org.apache.hyracks.storage.am.lsm.common.util.LSMComponentIdUtils;
@@ -61,10 +67,14 @@ public abstract class AbstractLSMIOOperationCallback
implements ILSMIOOperationC
protected ILSMComponentId[] nextComponentIds;
protected final ILSMComponentIdGenerator idGenerator;
+ private final IIndexCheckpointManagerProvider
indexCheckpointManagerProvider;
+ private final Map<ILSMComponentId, Long> componentLsnMap = new HashMap<>();
- public AbstractLSMIOOperationCallback(ILSMIndex lsmIndex,
ILSMComponentIdGenerator idGenerator) {
+ public AbstractLSMIOOperationCallback(ILSMIndex lsmIndex,
ILSMComponentIdGenerator idGenerator,
+ IIndexCheckpointManagerProvider indexCheckpointManagerProvider) {
this.lsmIndex = lsmIndex;
this.idGenerator = idGenerator;
+ this.indexCheckpointManagerProvider = indexCheckpointManagerProvider;
int count = lsmIndex.getNumberOfAllMemoryComponents();
mutableLastLSNs = new long[count];
firstLSNs = new long[count];
@@ -104,42 +114,59 @@ public abstract class AbstractLSMIOOperationCallback
implements ILSMIOOperationC
public void afterOperation(LSMIOOperationType opType, List<ILSMComponent>
oldComponents,
ILSMDiskComponent newComponent) throws HyracksDataException {
//TODO: Copying Filters and all content of the metadata pages for
flush operation should be done here
- if (newComponent != null) {
- putLSNIntoMetadata(newComponent, oldComponents);
- putComponentIdIntoMetadata(opType, newComponent, oldComponents);
- if (opType == LSMIOOperationType.MERGE) {
- // In case of merge, oldComponents are never null
- LongPointable markerLsn =
-
LongPointable.FACTORY.createPointable(ComponentUtils.getLong(oldComponents.get(0).getMetadata(),
- ComponentUtils.MARKER_LSN_KEY,
ComponentUtils.NOT_FOUND));
- newComponent.getMetadata().put(ComponentUtils.MARKER_LSN_KEY,
markerLsn);
- } else if (opType == LSMIOOperationType.FLUSH) {
- // advance memory component indexes
- synchronized (this) {
- // we've already consumed the specified LSN/component id.
- // Now we can advance to the next component
- flushRequested[readIndex] = false;
- // if the component which just finished flushing is the
component that will be modified next,
- // we set its first LSN to its previous LSN
- if (readIndex == writeIndex) {
- firstLSNs[writeIndex] = mutableLastLSNs[writeIndex];
- }
- readIndex = (readIndex + 1) % mutableLastLSNs.length;
+ if (newComponent == null) {
+ // failed operation. Nothing to do.
+ return;
+ }
+ putLSNIntoMetadata(newComponent, oldComponents);
+ putComponentIdIntoMetadata(opType, newComponent, oldComponents);
+ componentLsnMap.put(newComponent.getId(),
getComponentLSN(oldComponents));
+ if (opType == LSMIOOperationType.MERGE) {
+ if (oldComponents == null) {
+ throw new IllegalStateException("Merge must have old
components");
+ }
+ LongPointable markerLsn =
LongPointable.FACTORY.createPointable(ComponentUtils
+ .getLong(oldComponents.get(0).getMetadata(),
ComponentUtils.MARKER_LSN_KEY,
+ ComponentUtils.NOT_FOUND));
+ newComponent.getMetadata().put(ComponentUtils.MARKER_LSN_KEY,
markerLsn);
+ } else if (opType == LSMIOOperationType.FLUSH) {
+ // advance memory component indexes
+ synchronized (this) {
+ // we've already consumed the specified LSN/component id.
+ // Now we can advance to the next component
+ flushRequested[readIndex] = false;
+ // if the component which just finished flushing is the
component that will be modified next,
+ // we set its first LSN to its previous LSN
+ if (readIndex == writeIndex) {
+ firstLSNs[writeIndex] = mutableLastLSNs[writeIndex];
}
+ readIndex = (readIndex + 1) % mutableLastLSNs.length;
}
}
}
@Override
- public void afterFinalize(LSMIOOperationType opType, ILSMDiskComponent
newComponent) {
+ public void afterFinalize(LSMIOOperationType opType, ILSMDiskComponent
newComponent) throws HyracksDataException {
// The operation was complete and the next I/O operation for the LSM
index didn't start yet
if (opType == LSMIOOperationType.FLUSH) {
hasFlushed = true;
+ if (newComponent != null) {
+ final Long lsn = componentLsnMap.remove(newComponent.getId());
+ if (lsn == null) {
+ throw new IllegalStateException("Unidentified flushed
component: " + newComponent);
+ }
+ // empty component doesn't have any files
+ final Optional<String> componentFile =
newComponent.getLSMComponentPhysicalFiles().stream().findAny();
+ if (componentFile.isPresent()) {
+ final ResourceReference ref =
ResourceReference.of(componentFile.get());
+ final String componentEndTime =
AbstractLSMIndexFileManager.getComponentEndTime(ref.getName());
+
indexCheckpointManagerProvider.get(ref).flushed(componentEndTime, lsn);
+ }
+ }
}
-
}
- public void putLSNIntoMetadata(ILSMDiskComponent newComponent,
List<ILSMComponent> oldComponents)
+ private void putLSNIntoMetadata(ILSMDiskComponent newComponent,
List<ILSMComponent> oldComponents)
throws HyracksDataException {
newComponent.getMetadata().put(LSN_KEY,
LongPointable.FACTORY.createPointable(getComponentLSN(oldComponents)));
}
@@ -155,8 +182,8 @@ public abstract class AbstractLSMIOOperationCallback
implements ILSMIOOperationC
if (mergedComponents == null || mergedComponents.isEmpty()) {
return null;
}
- return LSMComponentIdUtils.union(mergedComponents.get(0).getId(),
- mergedComponents.get(mergedComponents.size() - 1).getId());
+ return LSMComponentIdUtils
+ .union(mergedComponents.get(0).getId(),
mergedComponents.get(mergedComponents.size() - 1).getId());
}
@@ -186,7 +213,7 @@ public abstract class AbstractLSMIOOperationCallback
implements ILSMIOOperationC
}
}
- public void setFirstLSN(long firstLSN) {
+ public synchronized void setFirstLSN(long firstLSN) {
// We make sure that this method is only called on an empty component
so the first LSN is not set incorrectly
firstLSNs[writeIndex] = firstLSN;
}
@@ -212,8 +239,7 @@ public abstract class AbstractLSMIOOperationCallback
implements ILSMIOOperationC
// Implies a flush IO operation. --> moves the flush pointer
// Flush operation of an LSM index are executed sequentially.
synchronized (this) {
- long lsn = mutableLastLSNs[readIndex];
- return lsn;
+ return mutableLastLSNs[readIndex];
}
}
// Get max LSN from the diskComponents. Implies a merge IO operation
or Recovery operation.
@@ -246,15 +272,4 @@ public abstract class AbstractLSMIOOperationCallback
implements ILSMIOOperationC
component.resetId(componentId);
}
}
-
- /**
- * @param component
- * @param componentFilePath
- * @return The LSN byte offset in the LSM disk component if the index is
valid,
- * otherwise {@link IMetadataPageManager#INVALID_LSN_OFFSET}.
- * @throws HyracksDataException
- */
- public abstract long getComponentFileLSNOffset(ILSMDiskComponent
component, String componentFilePath)
- throws HyracksDataException;
-
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIndexIOOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIndexIOOperationCallbackFactory.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIndexIOOperationCallbackFactory.java
index 5dff7f4..ed56ab1 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIndexIOOperationCallbackFactory.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/AbstractLSMIndexIOOperationCallbackFactory.java
@@ -21,6 +21,8 @@ package org.apache.asterix.common.ioopcallbacks;
import java.io.ObjectStreamException;
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
import org.apache.hyracks.api.application.INCServiceContext;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentId;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
@@ -49,6 +51,10 @@ public abstract class
AbstractLSMIndexIOOperationCallbackFactory implements ILSM
return idGeneratorFactory.getComponentIdGenerator(ncCtx);
}
+ protected IIndexCheckpointManagerProvider
getIndexCheckpointManagerProvider() {
+ return ((INcApplicationContext)
ncCtx.getApplicationContext()).getIndexCheckpointManagerProvider();
+ }
+
private void readObjectNoData() throws ObjectStreamException {
idGeneratorFactory = new ILSMComponentIdGeneratorFactory() {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java
----------------------------------------------------------------------
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java
index c1ee03b..db6c609 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallback.java
@@ -19,28 +19,14 @@
package org.apache.asterix.common.ioopcallbacks;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.am.btree.impls.BTree;
-import org.apache.hyracks.storage.am.common.api.IMetadataPageManager;
-import org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTreeFileManager;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
public class LSMBTreeIOOperationCallback extends
AbstractLSMIOOperationCallback {
- public LSMBTreeIOOperationCallback(ILSMIndex index,
ILSMComponentIdGenerator idGenerator) {
- super(index, idGenerator);
- }
-
- @Override
- public long getComponentFileLSNOffset(ILSMDiskComponent diskComponent,
String diskComponentFilePath)
- throws HyracksDataException {
- if (diskComponentFilePath.endsWith(LSMBTreeFileManager.BTREE_SUFFIX)) {
- IMetadataPageManager metadataPageManager =
- (IMetadataPageManager) ((BTree)
diskComponent.getIndex()).getPageManager();
- return
metadataPageManager.getFileOffset(metadataPageManager.createMetadataFrame(),
LSN_KEY);
- }
- return INVALID;
+ public LSMBTreeIOOperationCallback(ILSMIndex index,
ILSMComponentIdGenerator idGenerator,
+ IIndexCheckpointManagerProvider checkpointManagerProvider) {
+ super(index, idGenerator, checkpointManagerProvider);
}
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java
index 4ef12ef..95245cb 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeIOOperationCallbackFactory.java
@@ -33,6 +33,6 @@ public class LSMBTreeIOOperationCallbackFactory extends
AbstractLSMIndexIOOperat
@Override
public ILSMIOOperationCallback createIoOpCallback(ILSMIndex index) {
- return new LSMBTreeIOOperationCallback(index,
getComponentIdGenerator());
+ return new LSMBTreeIOOperationCallback(index,
getComponentIdGenerator(), getIndexCheckpointManagerProvider());
}
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallback.java
----------------------------------------------------------------------
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallback.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallback.java
index b43fb2f..da1446b 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallback.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallback.java
@@ -18,28 +18,14 @@
*/
package org.apache.asterix.common.ioopcallbacks;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.am.btree.impls.BTree;
-import org.apache.hyracks.storage.am.common.api.IMetadataPageManager;
-import
org.apache.hyracks.storage.am.lsm.btree.impls.LSMBTreeWithBuddyFileManager;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
public class LSMBTreeWithBuddyIOOperationCallback extends
AbstractLSMIOOperationCallback {
- public LSMBTreeWithBuddyIOOperationCallback(ILSMIndex lsmIndex,
ILSMComponentIdGenerator idGenerator) {
- super(lsmIndex, idGenerator);
- }
-
- @Override
- public long getComponentFileLSNOffset(ILSMDiskComponent diskComponent,
String diskComponentFilePath)
- throws HyracksDataException {
- if
(diskComponentFilePath.endsWith(LSMBTreeWithBuddyFileManager.BTREE_SUFFIX)) {
- IMetadataPageManager metadataPageManager =
- (IMetadataPageManager) ((BTree)
diskComponent.getIndex()).getPageManager();
- return
metadataPageManager.getFileOffset(metadataPageManager.createMetadataFrame(),
LSN_KEY);
- }
- return INVALID;
+ public LSMBTreeWithBuddyIOOperationCallback(ILSMIndex lsmIndex,
ILSMComponentIdGenerator idGenerator,
+ IIndexCheckpointManagerProvider checkpointManagerProvider) {
+ super(lsmIndex, idGenerator, checkpointManagerProvider);
}
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackFactory.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackFactory.java
index 6727bf6..6c75ed6 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackFactory.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMBTreeWithBuddyIOOperationCallbackFactory.java
@@ -32,6 +32,7 @@ public class LSMBTreeWithBuddyIOOperationCallbackFactory
extends AbstractLSMInde
@Override
public ILSMIOOperationCallback createIoOpCallback(ILSMIndex index) {
- return new LSMBTreeWithBuddyIOOperationCallback(index,
getComponentIdGenerator());
+ return new LSMBTreeWithBuddyIOOperationCallback(index,
getComponentIdGenerator(),
+ getIndexCheckpointManagerProvider());
}
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
----------------------------------------------------------------------
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
index 015cd38..3ba9bcd 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallback.java
@@ -19,29 +19,14 @@
package org.apache.asterix.common.ioopcallbacks;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.am.common.api.IMetadataPageManager;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
-import
org.apache.hyracks.storage.am.lsm.invertedindex.impls.LSMInvertedIndexDiskComponent;
-import
org.apache.hyracks.storage.am.lsm.invertedindex.impls.LSMInvertedIndexFileManager;
public class LSMInvertedIndexIOOperationCallback extends
AbstractLSMIOOperationCallback {
- public LSMInvertedIndexIOOperationCallback(ILSMIndex index,
ILSMComponentIdGenerator idGenerator) {
- super(index, idGenerator);
- }
-
- @Override
- public long getComponentFileLSNOffset(ILSMDiskComponent diskComponent,
String diskComponentFilePath)
- throws HyracksDataException {
- if
(diskComponentFilePath.endsWith(LSMInvertedIndexFileManager.DELETED_KEYS_BTREE_SUFFIX))
{
- LSMInvertedIndexDiskComponent invIndexComponent =
(LSMInvertedIndexDiskComponent) diskComponent;
- IMetadataPageManager metadataPageManager =
- (IMetadataPageManager)
invIndexComponent.getBuddyIndex().getPageManager();
- return
metadataPageManager.getFileOffset(metadataPageManager.createMetadataFrame(),
LSN_KEY);
- }
- return INVALID;
+ public LSMInvertedIndexIOOperationCallback(ILSMIndex index,
ILSMComponentIdGenerator idGenerator,
+ IIndexCheckpointManagerProvider checkpointManagerProvider) {
+ super(index, idGenerator, checkpointManagerProvider);
}
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java
index a2712d1..fb73d19 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMInvertedIndexIOOperationCallbackFactory.java
@@ -33,6 +33,7 @@ public class LSMInvertedIndexIOOperationCallbackFactory
extends AbstractLSMIndex
@Override
public ILSMIOOperationCallback createIoOpCallback(ILSMIndex index) {
- return new LSMInvertedIndexIOOperationCallback(index,
getComponentIdGenerator());
+ return new LSMInvertedIndexIOOperationCallback(index,
getComponentIdGenerator(),
+ getIndexCheckpointManagerProvider());
}
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java
----------------------------------------------------------------------
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java
index bc79074..f3e80ec 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallback.java
@@ -19,28 +19,14 @@
package org.apache.asterix.common.ioopcallbacks;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.storage.am.common.api.IMetadataPageManager;
+import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMComponentIdGenerator;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMDiskComponent;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
-import org.apache.hyracks.storage.am.lsm.rtree.impls.LSMRTreeFileManager;
-import org.apache.hyracks.storage.am.rtree.impls.RTree;
public class LSMRTreeIOOperationCallback extends
AbstractLSMIOOperationCallback {
- public LSMRTreeIOOperationCallback(ILSMIndex index,
ILSMComponentIdGenerator idGenerator) {
- super(index, idGenerator);
- }
-
- @Override
- public long getComponentFileLSNOffset(ILSMDiskComponent diskComponent,
String diskComponentFilePath)
- throws HyracksDataException {
- if (diskComponentFilePath.endsWith(LSMRTreeFileManager.RTREE_SUFFIX)) {
- IMetadataPageManager metadataPageManager =
- (IMetadataPageManager) ((RTree)
diskComponent.getIndex()).getPageManager();
- return
metadataPageManager.getFileOffset(metadataPageManager.createMetadataFrame(),
LSN_KEY);
- }
- return INVALID;
+ public LSMRTreeIOOperationCallback(ILSMIndex index,
ILSMComponentIdGenerator idGenerator,
+ IIndexCheckpointManagerProvider checkpointManagerProvodier) {
+ super(index, idGenerator, checkpointManagerProvodier);
}
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java
----------------------------------------------------------------------
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java
index 087aaae..94be0bb 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/ioopcallbacks/LSMRTreeIOOperationCallbackFactory.java
@@ -33,6 +33,6 @@ public class LSMRTreeIOOperationCallbackFactory extends
AbstractLSMIndexIOOperat
@Override
public ILSMIOOperationCallback createIoOpCallback(ILSMIndex index) {
- return new LSMRTreeIOOperationCallback(index,
getComponentIdGenerator());
+ return new LSMRTreeIOOperationCallback(index,
getComponentIdGenerator(), getIndexCheckpointManagerProvider());
}
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/DatasetResourceReference.java
----------------------------------------------------------------------
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/DatasetResourceReference.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/DatasetResourceReference.java
index d05321e..c488b65 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/DatasetResourceReference.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/DatasetResourceReference.java
@@ -28,6 +28,7 @@ public class DatasetResourceReference extends
ResourceReference {
private int datasetId;
private int partitionId;
+ private long resourceId;
private DatasetResourceReference() {
super();
@@ -53,6 +54,10 @@ public class DatasetResourceReference extends
ResourceReference {
return partitionId;
}
+ public long getResourceId() {
+ return resourceId;
+ }
+
private static DatasetResourceReference parse(LocalResource localResource)
{
final DatasetResourceReference datasetResourceReference = new
DatasetResourceReference();
final String filePath = Paths.get(localResource.getPath(),
StorageConstants.METADATA_FILE_NAME).toString();
@@ -73,5 +78,28 @@ public class DatasetResourceReference extends
ResourceReference {
final DatasetLocalResource dsResource = (DatasetLocalResource)
localResource.getResource();
lrr.datasetId = dsResource.getDatasetId();
lrr.partitionId = dsResource.getPartition();
+ lrr.resourceId = localResource.getId();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o != null && o instanceof ResourceReference) {
+ ResourceReference that = (ResourceReference) o;
+ return
getRelativePath().toString().equals(that.getRelativePath().toString());
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return getRelativePath().toString().hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return getRelativePath().toString();
}
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java
----------------------------------------------------------------------
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java
new file mode 100644
index 0000000..afa3823
--- /dev/null
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.storage;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public interface IIndexCheckpointManager {
+
+ /**
+ * Initializes the first checkpoint of an index with low watermark {@code
lsn}
+ *
+ * @param lsn
+ * @throws HyracksDataException
+ */
+ void init(long lsn) throws HyracksDataException;
+
+ /**
+ * Called when a new LSM disk component is flushed. When called, the
index checkpoiint is updated
+ * with the latest valid {@code componentTimestamp} and low watermark
{@code lsn}
+ *
+ * @param componentTimestamp
+ * @param lsn
+ * @throws HyracksDataException
+ */
+ void flushed(String componentTimestamp, long lsn) throws
HyracksDataException;
+
+ /**
+ * Called when a new LSM disk component is replicated from master. When
called, the index checkpoiint is updated
+ * with the latest valid {@code componentTimestamp} and the local lsn
mapping of {@code masterLsn} is set as the
+ * new low watermark.
+ *
+ * @param componentTimestamp
+ * @param masterLsn
+ * @throws HyracksDataException
+ */
+ void replicated(String componentTimestamp, long masterLsn) throws
HyracksDataException;
+
+ /**
+ * Called when a flush log is received and replicated from master. The
mapping between
+ * {@code masterLsn} and {@code localLsn} is updated in the checkpoint.
+ *
+ * @param masterLsn
+ * @param localLsn
+ * @throws HyracksDataException
+ */
+ void masterFlush(long masterLsn, long localLsn) throws
HyracksDataException;
+
+ /**
+ * The index low watermark
+ *
+ * @return The low watermark
+ * @throws HyracksDataException
+ */
+ long getLowWatermark() throws HyracksDataException;
+
+ /**
+ * True if a mapping exists between {@code masterLsn} and a localLsn.
Otherwise false.
+ *
+ * @param masterLsn
+ * @return True if the mapping exists. Otherwise false.
+ * @throws HyracksDataException
+ */
+ boolean isFlushed(long masterLsn) throws HyracksDataException;
+
+ /**
+ * Advance the index low watermark to {@code lsn}
+ *
+ * @param lsn
+ * @throws HyracksDataException
+ */
+ void advanceLowWatermark(long lsn) throws HyracksDataException;
+
+ /**
+ * Deletes all checkpoints
+ */
+ void delete();
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManagerProvider.java
----------------------------------------------------------------------
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManagerProvider.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManagerProvider.java
new file mode 100644
index 0000000..e6cef57
--- /dev/null
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManagerProvider.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.storage;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public interface IIndexCheckpointManagerProvider {
+
+ /**
+ * Gets {@link IIndexCheckpointManager} for the index referenced by {@code
ref}
+ *
+ * @param ref
+ * @return The index checkpoint manager.
+ * @throws HyracksDataException
+ */
+ IIndexCheckpointManager get(ResourceReference ref) throws
HyracksDataException;
+
+ /**
+ * Closes any resources used by the index checkpoint manager referenced by
{@code ref}
+ *
+ * @param ref
+ */
+ void close(ResourceReference ref);
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java
----------------------------------------------------------------------
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java
new file mode 100644
index 0000000..6e845e1
--- /dev/null
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.storage;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+public class IndexCheckpoint {
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private static final long INITIAL_CHECKPOINT_ID = 0;
+ private long id;
+ private String validComponentTimestamp;
+ private long lowWatermark;
+ private Map<Long, Long> masterNodeFlushMap;
+
+ public static IndexCheckpoint first(long lowWatermark) {
+ IndexCheckpoint firstCheckpoint = new IndexCheckpoint();
+ firstCheckpoint.id = INITIAL_CHECKPOINT_ID;
+ firstCheckpoint.lowWatermark = lowWatermark;
+ firstCheckpoint.validComponentTimestamp = null;
+ firstCheckpoint.masterNodeFlushMap = new HashMap<>();
+ return firstCheckpoint;
+ }
+
+ public static IndexCheckpoint next(IndexCheckpoint latest, long
lowWatermark, String validComponentTimestamp) {
+ if (lowWatermark < latest.getLowWatermark()) {
+ throw new IllegalStateException("Low watermark should always be
increasing");
+ }
+ IndexCheckpoint next = new IndexCheckpoint();
+ next.id = latest.getId() + 1;
+ next.lowWatermark = lowWatermark;
+ next.validComponentTimestamp = validComponentTimestamp;
+ next.masterNodeFlushMap = latest.getMasterNodeFlushMap();
+ // remove any lsn from the map that wont be used anymore
+ next.masterNodeFlushMap.values().removeIf(lsn -> lsn <= lowWatermark);
+ return next;
+ }
+
+ @JsonCreator
+ private IndexCheckpoint() {
+ }
+
+ public String getValidComponentTimestamp() {
+ return validComponentTimestamp;
+ }
+
+ public long getLowWatermark() {
+ return lowWatermark;
+ }
+
+ public Map<Long, Long> getMasterNodeFlushMap() {
+ return masterNodeFlushMap;
+ }
+
+ public long getId() {
+ return id;
+ }
+
+ public String asJson() throws HyracksDataException {
+ try {
+ return OBJECT_MAPPER.writeValueAsString(this);
+ } catch (JsonProcessingException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ public static IndexCheckpoint fromJson(String json) throws
HyracksDataException {
+ try {
+ return OBJECT_MAPPER.readValue(json, IndexCheckpoint.class);
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java
----------------------------------------------------------------------
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java
index bd057fa..4aa6982 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java
@@ -107,4 +107,26 @@ public class ResourceReference {
ref.root = tokens[--offset];
ref.rebalance = String.valueOf(0);
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o != null && o instanceof ResourceReference) {
+ ResourceReference that = (ResourceReference) o;
+ return
getRelativePath().toString().equals(that.getRelativePath().toString());
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return getRelativePath().toString().hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return getRelativePath().toString();
+ }
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
----------------------------------------------------------------------
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
index 6262f71..220b089 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
@@ -19,6 +19,7 @@
package org.apache.asterix.common.utils;
import org.apache.hyracks.storage.am.common.api.ITreeIndexFrame;
+import
org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
/**
* A static class that stores storage constants
@@ -27,6 +28,12 @@ public class StorageConstants {
public static final String STORAGE_ROOT_DIR_NAME = "storage";
public static final String PARTITION_DIR_PREFIX = "partition_";
+ /**
+ * Any file that shares the same directory as the LSM index files must
+ * begin with ".". Otherwise {@link AbstractLSMIndexFileManager} will try
to
+ * use them as index files.
+ */
+ public static final String INDEX_CHECKPOINT_FILE_PREFIX =
".idx_checkpoint_";
public static final String METADATA_FILE_NAME = ".metadata";
public static final String LEGACY_DATASET_INDEX_NAME_SEPARATOR = "_idx_";
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/929344e9/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java
----------------------------------------------------------------------
diff --git
a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java
b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java
index 2928d90..3aa7b17 100644
---
a/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java
+++
b/asterixdb/asterix-common/src/test/java/org/apache/asterix/test/context/CorrelatedPrefixMergePolicyTest.java
@@ -41,6 +41,7 @@ import
org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
import org.apache.hyracks.storage.am.lsm.common.api.ILSMMergePolicy;
import org.apache.hyracks.storage.am.lsm.common.impls.LSMComponentId;
import org.apache.hyracks.storage.common.IIndexAccessParameters;
+import org.apache.hyracks.storage.common.LocalResource;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
@@ -237,8 +238,8 @@ public class CorrelatedPrefixMergePolicyTest extends
TestCase {
Mockito.when(index.createAccessor(Mockito.any(IIndexAccessParameters.class))).thenReturn(accessor);
Mockito.when(index.isPrimaryIndex()).thenReturn(isPrimary);
-
- return new IndexInfo(index, DATASET_ID, 0, partition);
+ final LocalResource localResource = Mockito.mock(LocalResource.class);
+ return new IndexInfo(index, DATASET_ID, localResource, partition);
}
}