Repository: asterixdb Updated Branches: refs/heads/master 5a61b2ada -> c47773778
[NO ISSUE][OTR] Remove AppRuntimeContextProviderForRecovery - user model changes: no - storage format changes: no - interface changes: yes - Remove IAppRuntimeContextProvider Details: AppRuntimeContextProviderForRecovery is not needed since it has reference to ApplicationContext. ApplicationContext itself has all required references by other classes. Change-Id: I264b86b1bfff37c137936f620745025f0fb837ad Reviewed-on: https://asterix-gerrit.ics.uci.edu/2265 Sonar-Qube: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Contrib: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Reviewed-by: abdullah alamoudi <bamou...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/c4777377 Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/c4777377 Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/c4777377 Branch: refs/heads/master Commit: c47773778f7455c7a31fbccfda915ea3ce62ce5e Parents: 5a61b2a Author: Murtadha Hubail <mhub...@apache.org> Authored: Sun Jan 7 10:50:04 2018 +0300 Committer: Murtadha Hubail <mhub...@apache.org> Committed: Sun Jan 7 11:50:53 2018 -0800 ---------------------------------------------------------------------- .../AppRuntimeContextProviderForRecovery.java | 91 -------------------- .../asterix/app/nc/NCAppRuntimeContext.java | 8 +- .../apache/asterix/app/nc/RecoveryManager.java | 15 ++-- .../asterix/app/nc/TransactionSubsystem.java | 31 +++---- .../IAppRuntimeContextProvider.java | 52 ----------- .../transactions/ITransactionSubsystem.java | 3 +- ...dexModificationOperationCallbackFactory.java | 2 +- ...dexModificationOperationCallbackFactory.java | 2 +- .../UpsertOperationCallbackFactory.java | 2 +- .../management/service/logging/LogManager.java | 26 +++--- .../service/recovery/CheckpointManager.java | 4 +- .../locking/TestRuntimeContextProvider.java | 90 ------------------- 12 files changed, 39 insertions(+), 287 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c4777377/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AppRuntimeContextProviderForRecovery.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AppRuntimeContextProviderForRecovery.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AppRuntimeContextProviderForRecovery.java deleted file mode 100644 index 18ef143..0000000 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AppRuntimeContextProviderForRecovery.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.api.common; - -import java.util.concurrent.ExecutorService; - -import org.apache.asterix.app.nc.NCAppRuntimeContext; -import org.apache.asterix.common.api.IDatasetLifecycleManager; -import org.apache.asterix.common.api.INcApplicationContext; -import org.apache.asterix.common.transactions.IAppRuntimeContextProvider; -import org.apache.asterix.common.transactions.ITransactionSubsystem; -import org.apache.hyracks.api.io.IIOManager; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker; -import org.apache.hyracks.storage.common.ILocalResourceRepository; -import org.apache.hyracks.storage.common.buffercache.IBufferCache; - -public class AppRuntimeContextProviderForRecovery implements IAppRuntimeContextProvider { - - private final NCAppRuntimeContext asterixAppRuntimeContext; - - public AppRuntimeContextProviderForRecovery(NCAppRuntimeContext asterixAppRuntimeContext) { - this.asterixAppRuntimeContext = asterixAppRuntimeContext; - } - - @Override - public IBufferCache getBufferCache() { - return asterixAppRuntimeContext.getBufferCache(); - } - - @Override - public ITransactionSubsystem getTransactionSubsystem() { - return asterixAppRuntimeContext.getTransactionSubsystem(); - } - - @Override - public IDatasetLifecycleManager getDatasetLifecycleManager() { - return asterixAppRuntimeContext.getDatasetLifecycleManager(); - } - - @Override - public double getBloomFilterFalsePositiveRate() { - return asterixAppRuntimeContext.getBloomFilterFalsePositiveRate(); - } - - @Override - public ILSMIOOperationScheduler getLSMIOScheduler() { - return asterixAppRuntimeContext.getLSMIOScheduler(); - } - - @Override - public ILocalResourceRepository getLocalResourceRepository() { - return asterixAppRuntimeContext.getLocalResourceRepository(); - } - - @Override - public IIOManager getIOManager() { - return asterixAppRuntimeContext.getIoManager(); - } - - @Override - public ILSMOperationTracker getLSMBTreeOperationTracker(int datasetID) { - return asterixAppRuntimeContext.getLSMBTreeOperationTracker(datasetID); - } - - @Override - public INcApplicationContext getAppContext() { - return asterixAppRuntimeContext; - } - - @Override - public ExecutorService getThreadExecutor() { - return asterixAppRuntimeContext.getThreadExecutor(); - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c4777377/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java index 9c53c18..cb8c161 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java @@ -30,7 +30,6 @@ import java.util.concurrent.Executors; import java.util.stream.Collectors; import org.apache.asterix.active.ActiveManager; -import org.apache.asterix.api.common.AppRuntimeContextProviderForRecovery; import org.apache.asterix.common.api.ICoordinationService; import org.apache.asterix.common.api.IDatasetLifecycleManager; import org.apache.asterix.common.api.IDatasetMemoryManager; @@ -58,7 +57,6 @@ import org.apache.asterix.common.replication.IReplicationChannel; import org.apache.asterix.common.replication.IReplicationManager; import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider; import org.apache.asterix.common.storage.IReplicaManager; -import org.apache.asterix.common.transactions.IAppRuntimeContextProvider; import org.apache.asterix.common.transactions.IRecoveryManager; import org.apache.asterix.common.transactions.IRecoveryManager.SystemState; import org.apache.asterix.common.transactions.ITransactionSubsystem; @@ -69,8 +67,8 @@ import org.apache.asterix.metadata.MetadataNode; import org.apache.asterix.metadata.api.IAsterixStateProxy; import org.apache.asterix.metadata.api.IMetadataNode; import org.apache.asterix.metadata.bootstrap.MetadataBootstrap; -import org.apache.asterix.replication.management.ReplicationManager; import org.apache.asterix.replication.management.ReplicationChannel; +import org.apache.asterix.replication.management.ReplicationManager; import org.apache.asterix.runtime.transaction.GlobalResourceIdFactoryProvider; import org.apache.asterix.runtime.utils.NoOpCoordinationService; import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository; @@ -188,9 +186,7 @@ public class NCAppRuntimeContext implements INcApplicationContext { localResourceRepository = (PersistentLocalResourceRepository) persistentLocalResourceRepositoryFactory.createRepository(); - IAppRuntimeContextProvider asterixAppRuntimeContextProvider = new AppRuntimeContextProviderForRecovery(this); - txnSubsystem = new TransactionSubsystem(getServiceContext(), getServiceContext().getNodeId(), - asterixAppRuntimeContextProvider, txnProperties); + txnSubsystem = new TransactionSubsystem(this); IRecoveryManager recoveryMgr = txnSubsystem.getRecoveryManager(); SystemState systemState = recoveryMgr.getSystemState(); if (initialRun || systemState == SystemState.PERMANENT_DATA_LOSS) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c4777377/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java index 080ad48..c189983 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java @@ -50,7 +50,6 @@ import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback; import org.apache.asterix.common.storage.DatasetResourceReference; import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider; import org.apache.asterix.common.transactions.Checkpoint; -import org.apache.asterix.common.transactions.IAppRuntimeContextProvider; import org.apache.asterix.common.transactions.ICheckpointManager; import org.apache.asterix.common.transactions.ILogReader; import org.apache.asterix.common.transactions.ILogRecord; @@ -102,7 +101,7 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { public RecoveryManager(ITransactionSubsystem txnSubsystem, INCServiceContext serviceCtx) { this.serviceCtx = serviceCtx; this.txnSubsystem = txnSubsystem; - this.appCtx = txnSubsystem.getAsterixAppRuntimeContextProvider().getAppContext(); + this.appCtx = txnSubsystem.getApplicationContext(); logMgr = (LogManager) txnSubsystem.getLogManager(); ReplicationProperties repProperties = appCtx.getReplicationProperties(); replicationEnabled = repProperties.isReplicationEnabled(); @@ -277,8 +276,7 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { boolean foundWinner = false; JobEntityCommits jobEntityWinners = null; - IAppRuntimeContextProvider appRuntimeContext = txnSubsystem.getAsterixAppRuntimeContextProvider(); - IDatasetLifecycleManager datasetLifecycleManager = appRuntimeContext.getDatasetLifecycleManager(); + IDatasetLifecycleManager datasetLifecycleManager = appCtx.getDatasetLifecycleManager(); final IIndexCheckpointManagerProvider indexCheckpointManagerProvider = ((INcApplicationContext) (serviceCtx.getApplicationContext())).getIndexCheckpointManagerProvider(); @@ -409,8 +407,7 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { @Override public long getLocalMinFirstLSN() throws HyracksDataException { - IDatasetLifecycleManager datasetLifecycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider() - .getDatasetLifecycleManager(); + final IDatasetLifecycleManager datasetLifecycleManager = appCtx.getDatasetLifecycleManager(); List<IIndex> openIndexList = datasetLifecycleManager.getOpenResources(); long firstLSN; //the min first lsn can only be the current append or smaller @@ -431,8 +428,7 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { private long getRemoteMinFirstLSN() throws HyracksDataException { // find the min first lsn of partitions that are replicated on this node final Set<Integer> allPartitions = localResourceRepository.getAllPartitions(); - final INcApplicationContext appContext = txnSubsystem.getAsterixAppRuntimeContextProvider().getAppContext(); - final Set<Integer> masterPartitions = appContext.getReplicaManager().getPartitions(); + final Set<Integer> masterPartitions = appCtx.getReplicaManager().getPartitions(); allPartitions.removeAll(masterPartitions); return getPartitionsMinLSN(allPartitions); } @@ -632,8 +628,7 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { //undo loserTxn's effect LOGGER.log(Level.INFO, "undoing loser transaction's effect"); - IDatasetLifecycleManager datasetLifecycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider() - .getDatasetLifecycleManager(); + final IDatasetLifecycleManager datasetLifecycleManager = appCtx.getDatasetLifecycleManager(); //TODO sort loser entities by smallest LSN to undo in one pass. Iterator<Entry<TxnEntityId, List<Long>>> iter = jobLoserEntity2LSNsMap.entrySet().iterator(); int undoCount = 0; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c4777377/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java index fae0413..8158096 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java @@ -19,13 +19,13 @@ package org.apache.asterix.app.nc; import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import org.apache.asterix.common.api.INcApplicationContext; import org.apache.asterix.common.config.ReplicationProperties; import org.apache.asterix.common.config.TransactionProperties; -import org.apache.asterix.common.exceptions.ACIDException; import org.apache.asterix.common.transactions.Checkpoint; import org.apache.asterix.common.transactions.CheckpointProperties; -import org.apache.asterix.common.transactions.IAppRuntimeContextProvider; import org.apache.asterix.common.transactions.ICheckpointManager; import org.apache.asterix.common.transactions.ILockManager; import org.apache.asterix.common.transactions.ILogManager; @@ -37,7 +37,6 @@ import org.apache.asterix.transaction.management.service.logging.LogManager; import org.apache.asterix.transaction.management.service.logging.LogManagerWithReplication; import org.apache.asterix.transaction.management.service.recovery.CheckpointManagerFactory; import org.apache.asterix.transaction.management.service.transaction.TransactionManager; -import org.apache.hyracks.api.application.INCServiceContext; import org.apache.logging.log4j.Level; import org.apache.logging.log4j.Logger; @@ -52,26 +51,22 @@ public class TransactionSubsystem implements ITransactionSubsystem { private final ILockManager lockManager; private final ITransactionManager transactionManager; private final IRecoveryManager recoveryManager; - private final IAppRuntimeContextProvider asterixAppRuntimeContextProvider; private final TransactionProperties txnProperties; private final ICheckpointManager checkpointManager; + private final INcApplicationContext appCtx; //for profiling purpose private long profilerEntityCommitLogCount = 0; private EntityCommitProfiler ecp; - public TransactionSubsystem(INCServiceContext serviceCtx, String id, - IAppRuntimeContextProvider asterixAppRuntimeContextProvider, TransactionProperties txnProperties) - throws ACIDException { - this.asterixAppRuntimeContextProvider = asterixAppRuntimeContextProvider; - this.id = id; - this.txnProperties = txnProperties; + public TransactionSubsystem(INcApplicationContext appCtx) { + this.appCtx = appCtx; + this.id = appCtx.getServiceContext().getNodeId(); + this.txnProperties = appCtx.getTransactionProperties(); this.transactionManager = new TransactionManager(this); this.lockManager = new ConcurrentLockManager(txnProperties.getLockManagerShrinkTimer()); - ReplicationProperties repProperties = - asterixAppRuntimeContextProvider.getAppContext().getReplicationProperties(); + final ReplicationProperties repProperties = appCtx.getReplicationProperties(); final boolean replicationEnabled = repProperties.isReplicationEnabled(); - final CheckpointProperties checkpointProperties = new CheckpointProperties(txnProperties, id); if (LOGGER.isInfoEnabled()) { LOGGER.log(Level.INFO, "Checkpoint Properties: " + checkpointProperties); @@ -83,10 +78,10 @@ public class TransactionSubsystem implements ITransactionSubsystem { } this.logManager = replicationEnabled ? new LogManagerWithReplication(this) : new LogManager(this); - this.recoveryManager = new RecoveryManager(this, serviceCtx); - if (this.txnProperties.isCommitProfilerEnabled()) { + this.recoveryManager = new RecoveryManager(this, appCtx.getServiceContext()); + if (txnProperties.isCommitProfilerEnabled()) { ecp = new EntityCommitProfiler(this, this.txnProperties.getCommitProfilerReportInterval()); - getAsterixAppRuntimeContextProvider().getThreadExecutor().submit(ecp); + ((ExecutorService) appCtx.getThreadExecutor()).submit(ecp); } } @@ -111,8 +106,8 @@ public class TransactionSubsystem implements ITransactionSubsystem { } @Override - public IAppRuntimeContextProvider getAsterixAppRuntimeContextProvider() { - return asterixAppRuntimeContextProvider; + public INcApplicationContext getApplicationContext() { + return appCtx; } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c4777377/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IAppRuntimeContextProvider.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IAppRuntimeContextProvider.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IAppRuntimeContextProvider.java deleted file mode 100644 index 229fb6d..0000000 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/IAppRuntimeContextProvider.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.common.transactions; - -import java.util.concurrent.ExecutorService; - -import org.apache.asterix.common.api.IDatasetLifecycleManager; -import org.apache.asterix.common.api.INcApplicationContext; -import org.apache.hyracks.api.io.IIOManager; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker; -import org.apache.hyracks.storage.common.ILocalResourceRepository; -import org.apache.hyracks.storage.common.buffercache.IBufferCache; - -public interface IAppRuntimeContextProvider { - - ExecutorService getThreadExecutor(); - - IBufferCache getBufferCache(); - - ITransactionSubsystem getTransactionSubsystem(); - - IDatasetLifecycleManager getDatasetLifecycleManager(); - - double getBloomFilterFalsePositiveRate(); - - ILSMOperationTracker getLSMBTreeOperationTracker(int datasetID); - - ILSMIOOperationScheduler getLSMIOScheduler(); - - ILocalResourceRepository getLocalResourceRepository(); - - IIOManager getIOManager(); - - INcApplicationContext getAppContext(); -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c4777377/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionSubsystem.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionSubsystem.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionSubsystem.java index 7bd55e8..3642a71 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionSubsystem.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/transactions/ITransactionSubsystem.java @@ -18,6 +18,7 @@ */ package org.apache.asterix.common.transactions; +import org.apache.asterix.common.api.INcApplicationContext; import org.apache.asterix.common.config.TransactionProperties; /** @@ -34,7 +35,7 @@ public interface ITransactionSubsystem { IRecoveryManager getRecoveryManager(); - IAppRuntimeContextProvider getAsterixAppRuntimeContextProvider(); + INcApplicationContext getApplicationContext(); String getId(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c4777377/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java index 97fd7ce..d057c50 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallbackFactory.java @@ -59,7 +59,7 @@ public class PrimaryIndexModificationOperationCallbackFactory extends AbstractOp IHyracksTaskContext ctx, IOperatorNodePushable operatorNodePushable) throws HyracksDataException { ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx); IResourceLifecycleManager<IIndex> indexLifeCycleManager = - txnSubsystem.getAsterixAppRuntimeContextProvider().getDatasetLifecycleManager(); + txnSubsystem.getApplicationContext().getDatasetLifecycleManager(); ILSMIndex index = (ILSMIndex) indexLifeCycleManager.get(resource.getPath()); if (index == null) { throw new HyracksDataException("Index(id:" + resource.getId() + ") is not registered."); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c4777377/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java index 0c20ee9..26e1b22 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/SecondaryIndexModificationOperationCallbackFactory.java @@ -55,7 +55,7 @@ public class SecondaryIndexModificationOperationCallbackFactory extends Abstract IHyracksTaskContext ctx, IOperatorNodePushable operatorNodePushable) throws HyracksDataException { ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx); IResourceLifecycleManager indexLifeCycleManager = - txnSubsystem.getAsterixAppRuntimeContextProvider().getDatasetLifecycleManager(); + txnSubsystem.getApplicationContext().getDatasetLifecycleManager(); ILSMIndex index = (ILSMIndex) indexLifeCycleManager.get(resource.getPath()); if (index == null) { throw new HyracksDataException("Index(id:" + resource.getId() + ") is not registered."); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c4777377/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java index c2f512f..1449a1b 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/opcallbacks/UpsertOperationCallbackFactory.java @@ -55,7 +55,7 @@ public class UpsertOperationCallbackFactory extends AbstractOperationCallbackFac DatasetLocalResource aResource = (DatasetLocalResource) resource.getResource(); ITransactionSubsystem txnSubsystem = txnSubsystemProvider.getTransactionSubsystem(ctx); IResourceLifecycleManager indexLifeCycleManager = - txnSubsystem.getAsterixAppRuntimeContextProvider().getDatasetLifecycleManager(); + txnSubsystem.getApplicationContext().getDatasetLifecycleManager(); ILSMIndex index = (ILSMIndex) indexLifeCycleManager.get(resource.getPath()); if (index == null) { throw new HyracksDataException("Index(id:" + resource.getId() + ") is not registered."); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c4777377/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java index 833f8f6..96d0539 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManager.java @@ -34,6 +34,7 @@ import java.util.HashMap; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.Semaphore; @@ -77,7 +78,6 @@ public class LogManager implements ILogManager, ILifeCycleComponent { private final String logFilePrefix; private final MutableLong flushLSN; private final String nodeId; - private final FlushLogsLogger flushLogsLogger; private final HashMap<Long, Integer> txnLogFileId2ReaderCount = new HashMap<>(); private final long logFileSize; private final int logPageSize; @@ -107,7 +107,7 @@ public class LogManager implements ILogManager, ILifeCycleComponent { appendLSN = new AtomicLong(); nodeId = txnSubsystem.getId(); flushLogsQ = new LinkedBlockingQueue<>(); - flushLogsLogger = new FlushLogsLogger(); + txnSubsystem.getApplicationContext().getThreadExecutor().execute(new FlushLogsLogger()); initializeLogManager(SMALLEST_LOG_FILE_ID); } @@ -130,10 +130,8 @@ public class LogManager implements ILogManager, ILifeCycleComponent { } initNewPage(INITIAL_LOG_SIZE); logFlusher = new LogFlusher(this, emptyQ, flushQ, stashQ); - futureLogFlusher = txnSubsystem.getAsterixAppRuntimeContextProvider().getThreadExecutor().submit(logFlusher); - if (!flushLogsLogger.isAlive()) { - txnSubsystem.getAsterixAppRuntimeContextProvider().getThreadExecutor().execute(flushLogsLogger); - } + futureLogFlusher = + ((ExecutorService) txnSubsystem.getApplicationContext().getThreadExecutor()).submit(logFlusher); } @Override @@ -633,20 +631,20 @@ public class LogManager implements ILogManager, ILifeCycleComponent { /** * This class is used to log FLUSH logs. - * FLUSH logs are flushed on a different thread to avoid a possible deadlock in LogBuffer batchUnlock which calls PrimaryIndexOpeartionTracker.completeOperation - * The deadlock happens when PrimaryIndexOpeartionTracker.completeOperation results in generating a FLUSH log and there are no empty log buffers available to log it. + * FLUSH logs are flushed on a different thread to avoid a possible deadlock in {@link LogBuffer} batchUnlock + * which calls {@link org.apache.asterix.common.context.PrimaryIndexOperationTracker} completeOperation. The + * deadlock happens when completeOperation generates a FLUSH log and there are no empty log buffers available + * to log it. */ - private class FlushLogsLogger extends Thread { + private class FlushLogsLogger implements Runnable { @Override public void run() { - while (true) { + while (!Thread.currentThread().isInterrupted()) { try { - ILogRecord logRecord = flushLogsQ.take(); + final ILogRecord logRecord = flushLogsQ.take(); appendToLogTail(logRecord); - } catch (ACIDException e) { - e.printStackTrace(); } catch (InterruptedException e) { - //ignore + Thread.currentThread().interrupt(); } } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c4777377/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java index 3cb91ff..487bc84 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManager.java @@ -45,7 +45,7 @@ public class CheckpointManager extends AbstractCheckpointManager { @Override public synchronized void doSharpCheckpoint() throws HyracksDataException { LOGGER.info("Starting sharp checkpoint..."); - final IDatasetLifecycleManager datasetLifecycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider() + final IDatasetLifecycleManager datasetLifecycleManager = txnSubsystem.getApplicationContext() .getDatasetLifecycleManager(); datasetLifecycleManager.flushAllDatasets(); capture(SHARP_CHECKPOINT_LSN, true); @@ -66,7 +66,7 @@ public class CheckpointManager extends AbstractCheckpointManager { boolean checkpointSucceeded = minFirstLSN >= checkpointTargetLSN; if (!checkpointSucceeded) { // Flush datasets with indexes behind target checkpoint LSN - IDatasetLifecycleManager datasetLifecycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider() + IDatasetLifecycleManager datasetLifecycleManager = txnSubsystem.getApplicationContext() .getDatasetLifecycleManager(); datasetLifecycleManager.scheduleAsyncFlushForLaggingDatasets(checkpointTargetLSN); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/c4777377/asterixdb/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/TestRuntimeContextProvider.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/TestRuntimeContextProvider.java b/asterixdb/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/TestRuntimeContextProvider.java deleted file mode 100644 index f897aca..0000000 --- a/asterixdb/asterix-transactions/src/test/java/org/apache/asterix/transaction/management/service/locking/TestRuntimeContextProvider.java +++ /dev/null @@ -1,90 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.transaction.management.service.locking; - -import static org.mockito.Mockito.mock; - -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - -import org.apache.asterix.common.api.IDatasetLifecycleManager; -import org.apache.asterix.common.api.INcApplicationContext; -import org.apache.asterix.common.transactions.IAppRuntimeContextProvider; -import org.apache.asterix.common.transactions.ITransactionSubsystem; -import org.apache.hyracks.api.io.IIOManager; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMOperationTracker; -import org.apache.hyracks.storage.common.ILocalResourceRepository; -import org.apache.hyracks.storage.common.buffercache.IBufferCache; - -class TestRuntimeContextProvider implements IAppRuntimeContextProvider { - - ExecutorService ate = Executors.newCachedThreadPool(Executors.defaultThreadFactory()); - IDatasetLifecycleManager dlcm = mock(IDatasetLifecycleManager.class); - - @Override - public ExecutorService getThreadExecutor() { - return ate; - } - - @Override - public IBufferCache getBufferCache() { - throw new UnsupportedOperationException(); - } - - @Override - public ITransactionSubsystem getTransactionSubsystem() { - throw new UnsupportedOperationException(); - } - - @Override - public IDatasetLifecycleManager getDatasetLifecycleManager() { - return dlcm; - } - - @Override - public double getBloomFilterFalsePositiveRate() { - throw new UnsupportedOperationException(); - } - - @Override - public ILSMOperationTracker getLSMBTreeOperationTracker(int datasetID) { - throw new UnsupportedOperationException(); - } - - @Override - public ILSMIOOperationScheduler getLSMIOScheduler() { - throw new UnsupportedOperationException(); - } - - @Override - public ILocalResourceRepository getLocalResourceRepository() { - throw new UnsupportedOperationException(); - } - - @Override - public IIOManager getIOManager() { - throw new UnsupportedOperationException(); - } - - @Override - public INcApplicationContext getAppContext() { - throw new UnsupportedOperationException(); - } -}