http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCExtensionManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCExtensionManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCExtensionManager.java index 9e32b7e..9937479 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCExtensionManager.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCExtensionManager.java @@ -25,17 +25,18 @@ import java.util.List; import org.apache.asterix.common.api.IExtension; import org.apache.asterix.common.config.AsterixExtension; import org.apache.asterix.common.exceptions.ACIDException; -import org.apache.asterix.common.exceptions.RuntimeDataException; -import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.metadata.api.IMetadataExtension; +import org.apache.asterix.metadata.api.INCExtensionManager; import org.apache.asterix.metadata.entitytupletranslators.MetadataTupleTranslatorProvider; +import org.apache.asterix.utils.ExtensionUtil; +import org.apache.hyracks.api.application.INCApplicationContext; import org.apache.hyracks.api.exceptions.HyracksDataException; /** * AsterixDB's implementation of {@code INCExtensionManager} which takes care of * initializing extensions on Node Controllers */ -public class NCExtensionManager { +public class NCExtensionManager implements INCExtensionManager { private final MetadataTupleTranslatorProvider tupleTranslatorProvider; private final List<IMetadataExtension> mdExtensions; @@ -44,15 +45,19 @@ public class NCExtensionManager { * Initialize {@code CCExtensionManager} from configuration * * @param list + * list of user configured extensions * @throws InstantiationException + * if an extension couldn't be created * @throws IllegalAccessException + * if user doesn't have enough acess priveleges * @throws ClassNotFoundException + * if a class was not found * @throws HyracksDataException + * if two extensions conlict with each other */ public NCExtensionManager(List<AsterixExtension> list) throws InstantiationException, IllegalAccessException, ClassNotFoundException, HyracksDataException { - MetadataTupleTranslatorProvider ttp = null; - IMetadataExtension tupleTranslatorExtension = null; + IMetadataExtension tupleTranslatorProviderExtension = null; mdExtensions = new ArrayList<>(); if (list != null) { for (AsterixExtension extensionConf : list) { @@ -62,31 +67,23 @@ public class NCExtensionManager { case METADATA: IMetadataExtension mde = (IMetadataExtension) extension; mdExtensions.add(mde); - ttp = extendTupleTranslator(ttp, tupleTranslatorExtension, mde); - tupleTranslatorExtension = ttp == null ? null : mde; + tupleTranslatorProviderExtension = + ExtensionUtil.extendTupleTranslatorProvider(tupleTranslatorProviderExtension, mde); break; default: break; } } } - this.tupleTranslatorProvider = ttp == null ? new MetadataTupleTranslatorProvider() : ttp; - } - - private MetadataTupleTranslatorProvider extendTupleTranslator(MetadataTupleTranslatorProvider ttp, - IMetadataExtension tupleTranslatorExtension, IMetadataExtension mde) throws HyracksDataException { - if (ttp != null) { - throw new RuntimeDataException(ErrorCode.EXTENSION_COMPONENT_CONFLICT, - tupleTranslatorExtension.getId(), - mde.getId(), IMetadataExtension.class.getSimpleName()); - } - return mde.getMetadataTupleTranslatorProvider(); + this.tupleTranslatorProvider = tupleTranslatorProviderExtension == null ? new MetadataTupleTranslatorProvider() + : tupleTranslatorProviderExtension.getMetadataTupleTranslatorProvider(); } public List<IMetadataExtension> getMetadataExtensions() { return mdExtensions; } + @Override public MetadataTupleTranslatorProvider getMetadataTupleTranslatorProvider() { return tupleTranslatorProvider; } @@ -94,13 +91,15 @@ public class NCExtensionManager { /** * Called on bootstrap of metadata node allowing extensions to instantiate their Metadata artifacts * + * @param ncApplicationContext + * the node controller application context * @throws HyracksDataException */ - public void initializeMetadata() throws HyracksDataException { + public void initializeMetadata(INCApplicationContext appCtx) throws HyracksDataException { if (mdExtensions != null) { for (IMetadataExtension mdExtension : mdExtensions) { try { - mdExtension.initializeMetadata(); + mdExtension.initializeMetadata(appCtx); } catch (RemoteException | ACIDException e) { throw new HyracksDataException(e); }
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java new file mode 100644 index 0000000..4edf991 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java @@ -0,0 +1,868 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.nc; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.logging.Level; +import java.util.logging.Logger; + +import org.apache.asterix.common.api.IDatasetLifecycleManager; +import org.apache.asterix.common.config.ClusterProperties; +import org.apache.asterix.common.exceptions.ACIDException; +import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback; +import org.apache.asterix.common.replication.IReplicaResourcesManager; +import org.apache.asterix.common.transactions.Checkpoint; +import org.apache.asterix.common.transactions.IAppRuntimeContextProvider; +import org.apache.asterix.common.transactions.ICheckpointManager; +import org.apache.asterix.common.transactions.ILogReader; +import org.apache.asterix.common.transactions.ILogRecord; +import org.apache.asterix.common.transactions.IRecoveryManager; +import org.apache.asterix.common.transactions.ITransactionContext; +import org.apache.asterix.common.transactions.ITransactionSubsystem; +import org.apache.asterix.common.transactions.LogType; +import org.apache.asterix.common.transactions.Resource; +import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository; +import org.apache.asterix.transaction.management.service.logging.LogManager; +import org.apache.asterix.transaction.management.service.recovery.AbstractCheckpointManager; +import org.apache.asterix.transaction.management.service.recovery.TxnId; +import org.apache.asterix.transaction.management.service.transaction.TransactionManagementConstants; +import org.apache.commons.io.FileUtils; +import org.apache.hyracks.api.application.INCApplicationContext; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.lifecycle.ILifeCycleComponent; +import org.apache.hyracks.storage.am.common.api.IIndex; +import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback; +import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; +import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; +import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex; +import org.apache.hyracks.storage.common.file.LocalResource; + +/** + * This is the Recovery Manager and is responsible for rolling back a + * transaction as well as doing a system recovery. + */ +public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { + + public static final boolean IS_DEBUG_MODE = false; + private static final Logger LOGGER = Logger.getLogger(RecoveryManager.class.getName()); + private final ITransactionSubsystem txnSubsystem; + private final LogManager logMgr; + private final boolean replicationEnabled; + private static final String RECOVERY_FILES_DIR_NAME = "recovery_temp"; + private Map<Integer, JobEntityCommits> jobId2WinnerEntitiesMap = null; + private final long cachedEntityCommitsPerJobSize; + private final PersistentLocalResourceRepository localResourceRepository; + private final ICheckpointManager checkpointManager; + private SystemState state; + private final INCApplicationContext appCtx; + + public RecoveryManager(ITransactionSubsystem txnSubsystem, INCApplicationContext appCtx) { + this.appCtx = appCtx; + this.txnSubsystem = txnSubsystem; + logMgr = (LogManager) txnSubsystem.getLogManager(); + replicationEnabled = ClusterProperties.INSTANCE.isReplicationEnabled(); + localResourceRepository = (PersistentLocalResourceRepository) txnSubsystem + .getAsterixAppRuntimeContextProvider().getLocalResourceRepository(); + cachedEntityCommitsPerJobSize = txnSubsystem.getTransactionProperties().getJobRecoveryMemorySize(); + checkpointManager = txnSubsystem.getCheckpointManager(); + } + + /** + * returns system state which could be one of the three states: HEALTHY, RECOVERING, CORRUPTED. + * This state information could be used in a case where more than one thread is running + * in the bootstrap process to provide higher availability. In other words, while the system + * is recovered, another thread may start a new transaction with understanding the side effect + * of the operation, or the system can be recovered concurrently. This kind of concurrency is + * not supported, yet. + */ + @Override + public SystemState getSystemState() throws ACIDException { + //read checkpoint file + Checkpoint checkpointObject = checkpointManager.getLatest(); + if (checkpointObject == null) { + //The checkpoint file doesn't exist => Failure happened during NC initialization. + //Retry to initialize the NC by setting the state to NEW_UNIVERSE + state = SystemState.NEW_UNIVERSE; + if (LOGGER.isLoggable(Level.INFO)) { + LOGGER.info("The checkpoint file doesn't exist: systemState = NEW_UNIVERSE"); + } + return state; + } + + if (replicationEnabled) { + if (checkpointObject.getMinMCTFirstLsn() == AbstractCheckpointManager.SHARP_CHECKPOINT_LSN + || (checkpointObject.getCheckpointLsn() == logMgr.getAppendLSN() && checkpointObject.isSharp())) { + //no logs exist or only remote logs exist + state = SystemState.HEALTHY; + return state; + } else { + //need to perform remote recovery + state = SystemState.CORRUPTED; + return state; + } + } else { + long readableSmallestLSN = logMgr.getReadableSmallestLSN(); + if (logMgr.getAppendLSN() == readableSmallestLSN) { + if (checkpointObject.getMinMCTFirstLsn() != AbstractCheckpointManager.SHARP_CHECKPOINT_LSN) { + LOGGER.warning("Some(or all) of transaction log files are lost."); + //No choice but continuing when the log files are lost. + } + state = SystemState.HEALTHY; + return state; + } else if (checkpointObject.getCheckpointLsn() == logMgr.getAppendLSN() + && checkpointObject.getMinMCTFirstLsn() == AbstractCheckpointManager.SHARP_CHECKPOINT_LSN) { + state = SystemState.HEALTHY; + return state; + } else { + state = SystemState.CORRUPTED; + return state; + } + } + } + + //This method is used only when replication is disabled. + @Override + public void startRecovery(boolean synchronous) throws IOException, ACIDException { + state = SystemState.RECOVERING; + LOGGER.log(Level.INFO, "starting recovery ..."); + + long readableSmallestLSN = logMgr.getReadableSmallestLSN(); + Checkpoint checkpointObject = checkpointManager.getLatest(); + long lowWaterMarkLSN = checkpointObject.getMinMCTFirstLsn(); + if (lowWaterMarkLSN < readableSmallestLSN) { + lowWaterMarkLSN = readableSmallestLSN; + } + + //delete any recovery files from previous failed recovery attempts + deleteRecoveryTemporaryFiles(); + + //get active partitions on this node + Set<Integer> activePartitions = localResourceRepository.getNodeOrignalPartitions(); + replayPartitionsLogs(activePartitions, logMgr.getLogReader(true), lowWaterMarkLSN); + } + + @Override + public synchronized void replayPartitionsLogs(Set<Integer> partitions, ILogReader logReader, long lowWaterMarkLSN) + throws IOException, ACIDException { + try { + Set<Integer> winnerJobSet = startRecoverysAnalysisPhase(partitions, logReader, lowWaterMarkLSN); + startRecoveryRedoPhase(partitions, logReader, lowWaterMarkLSN, winnerJobSet); + } finally { + logReader.close(); + deleteRecoveryTemporaryFiles(); + } + } + + private synchronized Set<Integer> startRecoverysAnalysisPhase(Set<Integer> partitions, ILogReader logReader, + long lowWaterMarkLSN) throws IOException, ACIDException { + int updateLogCount = 0; + int entityCommitLogCount = 0; + int jobCommitLogCount = 0; + int abortLogCount = 0; + Set<Integer> winnerJobSet = new HashSet<>(); + jobId2WinnerEntitiesMap = new HashMap<>(); + //set log reader to the lowWaterMarkLsn + ILogRecord logRecord; + logReader.initializeScan(lowWaterMarkLSN); + logRecord = logReader.next(); + while (logRecord != null) { + if (IS_DEBUG_MODE) { + LOGGER.info(logRecord.getLogRecordForDisplay()); + } + switch (logRecord.getLogType()) { + case LogType.UPDATE: + if (partitions.contains(logRecord.getResourcePartition())) { + updateLogCount++; + } + break; + case LogType.JOB_COMMIT: + winnerJobSet.add(logRecord.getJobId()); + cleanupJobCommits(logRecord.getJobId()); + jobCommitLogCount++; + break; + case LogType.ENTITY_COMMIT: + case LogType.UPSERT_ENTITY_COMMIT: + if (partitions.contains(logRecord.getResourcePartition())) { + analyzeEntityCommitLog(logRecord); + entityCommitLogCount++; + } + break; + case LogType.ABORT: + abortLogCount++; + break; + case LogType.FLUSH: + case LogType.WAIT: + case LogType.MARKER: + break; + default: + throw new ACIDException("Unsupported LogType: " + logRecord.getLogType()); + } + logRecord = logReader.next(); + } + + //prepare winners for search after analysis is done to flush anything remaining in memory to disk. + for (JobEntityCommits winners : jobId2WinnerEntitiesMap.values()) { + winners.prepareForSearch(); + } + + LOGGER.info("Logs analysis phase completed."); + LOGGER.info("Analysis log count update/entityCommit/jobCommit/abort = " + updateLogCount + "/" + + entityCommitLogCount + "/" + jobCommitLogCount + "/" + abortLogCount); + + return winnerJobSet; + } + + private void cleanupJobCommits(int jobId) { + if (jobId2WinnerEntitiesMap.containsKey(jobId)) { + JobEntityCommits jobEntityWinners = jobId2WinnerEntitiesMap.get(jobId); + //to delete any spilled files as well + jobEntityWinners.clear(); + jobId2WinnerEntitiesMap.remove(jobId); + } + } + + private void analyzeEntityCommitLog(ILogRecord logRecord) throws IOException { + int jobId = logRecord.getJobId(); + JobEntityCommits jobEntityWinners; + if (!jobId2WinnerEntitiesMap.containsKey(jobId)) { + jobEntityWinners = new JobEntityCommits(jobId); + if (needToFreeMemory()) { + // If we don't have enough memory for one more job, + // we will force all jobs to spill their cached entities to disk. + // This could happen only when we have many jobs with small + // number of records and none of them have job commit. + freeJobsCachedEntities(jobId); + } + jobId2WinnerEntitiesMap.put(jobId, jobEntityWinners); + } else { + jobEntityWinners = jobId2WinnerEntitiesMap.get(jobId); + } + jobEntityWinners.add(logRecord); + } + + private synchronized void startRecoveryRedoPhase(Set<Integer> partitions, ILogReader logReader, + long lowWaterMarkLSN, Set<Integer> winnerJobSet) throws IOException, ACIDException { + int redoCount = 0; + int jobId = -1; + + long resourceId; + long maxDiskLastLsn; + long lsn = -1; + ILSMIndex index = null; + LocalResource localResource = null; + Resource localResourceMetadata = null; + boolean foundWinner = false; + JobEntityCommits jobEntityWinners = null; + + IAppRuntimeContextProvider appRuntimeContext = txnSubsystem.getAsterixAppRuntimeContextProvider(); + IDatasetLifecycleManager datasetLifecycleManager = appRuntimeContext.getDatasetLifecycleManager(); + + Map<Long, LocalResource> resourcesMap = localResourceRepository.loadAndGetAllResources(); + Map<Long, Long> resourceId2MaxLSNMap = new HashMap<>(); + TxnId tempKeyTxnId = new TxnId(-1, -1, -1, null, -1, false); + + ILogRecord logRecord = null; + try { + logReader.initializeScan(lowWaterMarkLSN); + logRecord = logReader.next(); + while (logRecord != null) { + if (IS_DEBUG_MODE) { + LOGGER.info(logRecord.getLogRecordForDisplay()); + } + lsn = logRecord.getLSN(); + jobId = logRecord.getJobId(); + foundWinner = false; + switch (logRecord.getLogType()) { + case LogType.UPDATE: + if (partitions.contains(logRecord.getResourcePartition())) { + if (winnerJobSet.contains(jobId)) { + foundWinner = true; + } else if (jobId2WinnerEntitiesMap.containsKey(jobId)) { + jobEntityWinners = jobId2WinnerEntitiesMap.get(jobId); + tempKeyTxnId.setTxnId(jobId, logRecord.getDatasetId(), logRecord.getPKHashValue(), + logRecord.getPKValue(), logRecord.getPKValueSize()); + if (jobEntityWinners.containsEntityCommitForTxnId(lsn, tempKeyTxnId)) { + foundWinner = true; + } + } + if (foundWinner) { + resourceId = logRecord.getResourceId(); + localResource = resourcesMap.get(resourceId); + /******************************************************************* + * [Notice] + * -> Issue + * Delete index may cause a problem during redo. + * The index operation to be redone couldn't be redone because the corresponding index + * may not exist in NC due to the possible index drop DDL operation. + * -> Approach + * Avoid the problem during redo. + * More specifically, the problem will be detected when the localResource of + * the corresponding index is retrieved, which will end up with 'null'. + * If null is returned, then just go and process the next + * log record. + *******************************************************************/ + if (localResource == null) { + logRecord = logReader.next(); + continue; + } + /*******************************************************************/ + + //get index instance from IndexLifeCycleManager + //if index is not registered into IndexLifeCycleManager, + //create the index using LocalMetadata stored in LocalResourceRepository + //get partition path in this node + localResourceMetadata = (Resource) localResource.getResource(); + index = (ILSMIndex) datasetLifecycleManager.get(localResource.getPath()); + if (index == null) { + //#. create index instance and register to indexLifeCycleManager + index = localResourceMetadata.createIndexInstance(appCtx, localResource); + datasetLifecycleManager.register(localResource.getPath(), index); + datasetLifecycleManager.open(localResource.getPath()); + + //#. get maxDiskLastLSN + ILSMIndex lsmIndex = index; + try { + maxDiskLastLsn = + ((AbstractLSMIOOperationCallback) lsmIndex.getIOOperationCallback()) + .getComponentLSN(lsmIndex.getImmutableComponents()); + } catch (HyracksDataException e) { + datasetLifecycleManager.close(localResource.getPath()); + throw e; + } + + //#. set resourceId and maxDiskLastLSN to the map + resourceId2MaxLSNMap.put(resourceId, maxDiskLastLsn); + } else { + maxDiskLastLsn = resourceId2MaxLSNMap.get(resourceId); + } + + if (lsn > maxDiskLastLsn) { + redo(logRecord, datasetLifecycleManager); + redoCount++; + } + } + } + break; + case LogType.JOB_COMMIT: + case LogType.ENTITY_COMMIT: + case LogType.ABORT: + case LogType.FLUSH: + case LogType.UPSERT_ENTITY_COMMIT: + case LogType.WAIT: + case LogType.MARKER: + //do nothing + break; + default: + throw new ACIDException("Unsupported LogType: " + logRecord.getLogType()); + } + logRecord = logReader.next(); + } + LOGGER.info("Logs REDO phase completed. Redo logs count: " + redoCount); + } finally { + //close all indexes + Set<Long> resourceIdList = resourceId2MaxLSNMap.keySet(); + for (long r : resourceIdList) { + datasetLifecycleManager.close(resourcesMap.get(r).getPath()); + } + } + } + + private boolean needToFreeMemory() { + return Runtime.getRuntime().freeMemory() < cachedEntityCommitsPerJobSize; + } + + @Override + public long getMinFirstLSN() throws HyracksDataException { + long minFirstLSN = getLocalMinFirstLSN(); + + //if replication is enabled, consider replica resources min LSN + if (replicationEnabled) { + long remoteMinFirstLSN = getRemoteMinFirstLSN(); + minFirstLSN = Math.min(minFirstLSN, remoteMinFirstLSN); + } + + return minFirstLSN; + } + + @Override + public long getLocalMinFirstLSN() throws HyracksDataException { + IDatasetLifecycleManager datasetLifecycleManager = + txnSubsystem.getAsterixAppRuntimeContextProvider().getDatasetLifecycleManager(); + List<IIndex> openIndexList = datasetLifecycleManager.getOpenResources(); + long firstLSN; + //the min first lsn can only be the current append or smaller + long minFirstLSN = logMgr.getAppendLSN(); + if (!openIndexList.isEmpty()) { + for (IIndex index : openIndexList) { + AbstractLSMIOOperationCallback ioCallback = + (AbstractLSMIOOperationCallback) ((ILSMIndex) index).getIOOperationCallback(); + if (!((AbstractLSMIndex) index).isCurrentMutableComponentEmpty() || ioCallback.hasPendingFlush()) { + firstLSN = ioCallback.getFirstLSN(); + minFirstLSN = Math.min(minFirstLSN, firstLSN); + } + } + } + return minFirstLSN; + } + + private long getRemoteMinFirstLSN() { + IReplicaResourcesManager remoteResourcesManager = + txnSubsystem.getAsterixAppRuntimeContextProvider().getAppContext().getReplicaResourcesManager(); + return remoteResourcesManager.getPartitionsMinLSN(localResourceRepository.getInactivePartitions()); + } + + @Override + public File createJobRecoveryFile(int jobId, String fileName) throws IOException { + String recoveryDirPath = getRecoveryDirPath(); + Path jobRecoveryFolder = Paths.get(recoveryDirPath + File.separator + jobId); + if (!Files.exists(jobRecoveryFolder)) { + Files.createDirectories(jobRecoveryFolder); + } + + File jobRecoveryFile = new File(jobRecoveryFolder.toString() + File.separator + fileName); + if (!jobRecoveryFile.exists()) { + if (!jobRecoveryFile.createNewFile()) { + throw new IOException("Failed to create file: " + fileName + " for job id(" + jobId + ")"); + } + } else { + throw new IOException("File: " + fileName + " for job id(" + jobId + ") already exists"); + } + return jobRecoveryFile; + } + + @Override + public void deleteRecoveryTemporaryFiles() { + String recoveryDirPath = getRecoveryDirPath(); + Path recoveryFolderPath = Paths.get(recoveryDirPath); + FileUtils.deleteQuietly(recoveryFolderPath.toFile()); + } + + private String getRecoveryDirPath() { + String logDir = logMgr.getLogManagerProperties().getLogDir(); + if (!logDir.endsWith(File.separator)) { + logDir += File.separator; + } + + return logDir + RECOVERY_FILES_DIR_NAME; + } + + private void freeJobsCachedEntities(int requestingJobId) throws IOException { + if (jobId2WinnerEntitiesMap != null) { + for (Entry<Integer, JobEntityCommits> jobEntityCommits : jobId2WinnerEntitiesMap.entrySet()) { + //if the job is not the requester, free its memory + if (jobEntityCommits.getKey() != requestingJobId) { + jobEntityCommits.getValue().spillToDiskAndfreeMemory(); + } + } + } + } + + @Override + public void rollbackTransaction(ITransactionContext txnContext) throws ACIDException { + int abortedJobId = txnContext.getJobId().getId(); + // Obtain the first/last log record LSNs written by the Job + long firstLSN = txnContext.getFirstLSN(); + /** + * The effect of any log record with LSN below minFirstLSN has already been written to disk and + * will not be rolled back. Therefore, we will set the first LSN of the job to the maximum of + * minFirstLSN and the job's first LSN. + */ + try { + long localMinFirstLSN = getLocalMinFirstLSN(); + firstLSN = Math.max(firstLSN, localMinFirstLSN); + } catch (HyracksDataException e) { + throw new ACIDException(e); + } + long lastLSN = txnContext.getLastLSN(); + if (LOGGER.isLoggable(Level.INFO)) { + LOGGER.info("rollbacking transaction log records from " + firstLSN + " to " + lastLSN); + } + // check if the transaction actually wrote some logs. + if (firstLSN == TransactionManagementConstants.LogManagerConstants.TERMINAL_LSN || firstLSN > lastLSN) { + if (LOGGER.isLoggable(Level.INFO)) { + LOGGER.info("no need to roll back as there were no operations by the job " + txnContext.getJobId()); + } + return; + } + + // While reading log records from firstLsn to lastLsn, collect uncommitted txn's Lsns + if (LOGGER.isLoggable(Level.INFO)) { + LOGGER.info("collecting loser transaction's LSNs from " + firstLSN + " to " + lastLSN); + } + + Map<TxnId, List<Long>> jobLoserEntity2LSNsMap = new HashMap<>(); + TxnId tempKeyTxnId = new TxnId(-1, -1, -1, null, -1, false); + int updateLogCount = 0; + int entityCommitLogCount = 0; + int logJobId = -1; + long currentLSN = -1; + TxnId loserEntity = null; + List<Long> undoLSNSet = null; + //get active partitions on this node + Set<Integer> activePartitions = localResourceRepository.getActivePartitions(); + ILogReader logReader = logMgr.getLogReader(false); + try { + logReader.initializeScan(firstLSN); + ILogRecord logRecord = null; + while (currentLSN < lastLSN) { + logRecord = logReader.next(); + if (logRecord == null) { + break; + } else { + currentLSN = logRecord.getLSN(); + + if (IS_DEBUG_MODE) { + LOGGER.info(logRecord.getLogRecordForDisplay()); + } + } + logJobId = logRecord.getJobId(); + if (logJobId != abortedJobId) { + continue; + } + tempKeyTxnId.setTxnId(logJobId, logRecord.getDatasetId(), logRecord.getPKHashValue(), + logRecord.getPKValue(), logRecord.getPKValueSize()); + switch (logRecord.getLogType()) { + case LogType.UPDATE: + if (activePartitions.contains(logRecord.getResourcePartition())) { + undoLSNSet = jobLoserEntity2LSNsMap.get(tempKeyTxnId); + if (undoLSNSet == null) { + loserEntity = new TxnId(logJobId, logRecord.getDatasetId(), logRecord.getPKHashValue(), + logRecord.getPKValue(), logRecord.getPKValueSize(), true); + undoLSNSet = new LinkedList<>(); + jobLoserEntity2LSNsMap.put(loserEntity, undoLSNSet); + } + undoLSNSet.add(currentLSN); + updateLogCount++; + if (IS_DEBUG_MODE) { + LOGGER.info(Thread.currentThread().getId() + "======> update[" + currentLSN + "]:" + + tempKeyTxnId); + } + } + break; + case LogType.ENTITY_COMMIT: + case LogType.UPSERT_ENTITY_COMMIT: + if (activePartitions.contains(logRecord.getResourcePartition())) { + jobLoserEntity2LSNsMap.remove(tempKeyTxnId); + entityCommitLogCount++; + if (IS_DEBUG_MODE) { + LOGGER.info(Thread.currentThread().getId() + "======> entity_commit[" + currentLSN + + "]" + tempKeyTxnId); + } + } + break; + case LogType.JOB_COMMIT: + throw new ACIDException("Unexpected LogType(" + logRecord.getLogType() + ") during abort."); + case LogType.ABORT: + case LogType.FLUSH: + case LogType.WAIT: + case LogType.MARKER: + //ignore + break; + default: + throw new ACIDException("Unsupported LogType: " + logRecord.getLogType()); + } + } + + if (currentLSN != lastLSN) { + throw new ACIDException("LastLSN mismatch: lastLSN(" + lastLSN + ") vs currentLSN(" + currentLSN + + ") during abort( " + txnContext.getJobId() + ")"); + } + + //undo loserTxn's effect + LOGGER.log(Level.INFO, "undoing loser transaction's effect"); + + IDatasetLifecycleManager datasetLifecycleManager = + txnSubsystem.getAsterixAppRuntimeContextProvider().getDatasetLifecycleManager(); + //TODO sort loser entities by smallest LSN to undo in one pass. + Iterator<Entry<TxnId, List<Long>>> iter = jobLoserEntity2LSNsMap.entrySet().iterator(); + int undoCount = 0; + while (iter.hasNext()) { + Map.Entry<TxnId, List<Long>> loserEntity2LSNsMap = iter.next(); + undoLSNSet = loserEntity2LSNsMap.getValue(); + // The step below is important since the upsert operations must be done in reverse order. + Collections.reverse(undoLSNSet); + for (long undoLSN : undoLSNSet) { + //here, all the log records are UPDATE type. So, we don't need to check the type again. + //read the corresponding log record to be undone. + logRecord = logReader.read(undoLSN); + if (logRecord == null) { + throw new ACIDException("IllegalState exception during abort( " + txnContext.getJobId() + ")"); + } + if (IS_DEBUG_MODE) { + LOGGER.info(logRecord.getLogRecordForDisplay()); + } + undo(logRecord, datasetLifecycleManager); + undoCount++; + } + } + + if (LOGGER.isLoggable(Level.INFO)) { + LOGGER.info("undone loser transaction's effect"); + LOGGER.info("[RecoveryManager's rollback log count] update/entityCommit/undo:" + updateLogCount + "/" + + entityCommitLogCount + "/" + undoCount); + } + } finally { + logReader.close(); + } + } + + @Override + public void start() { + //no op + } + + @Override + public void stop(boolean dumpState, OutputStream os) throws IOException { + // Shutdown checkpoint + checkpointManager.doSharpCheckpoint(); + } + + @Override + public void dumpState(OutputStream os) throws IOException { + // do nothing + } + + private static void undo(ILogRecord logRecord, IDatasetLifecycleManager datasetLifecycleManager) { + try { + ILSMIndex index = + (ILSMIndex) datasetLifecycleManager.getIndex(logRecord.getDatasetId(), logRecord.getResourceId()); + ILSMIndexAccessor indexAccessor = + index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); + if (logRecord.getNewOp() == IndexOperation.INSERT.ordinal()) { + indexAccessor.forceDelete(logRecord.getNewValue()); + } else if (logRecord.getNewOp() == IndexOperation.DELETE.ordinal()) { + indexAccessor.forceInsert(logRecord.getNewValue()); + } else { + throw new IllegalStateException("Unsupported OperationType: " + logRecord.getNewOp()); + } + } catch (Exception e) { + throw new IllegalStateException("Failed to undo", e); + } + } + + private static void redo(ILogRecord logRecord, IDatasetLifecycleManager datasetLifecycleManager) { + try { + int datasetId = logRecord.getDatasetId(); + long resourceId = logRecord.getResourceId(); + ILSMIndex index = (ILSMIndex) datasetLifecycleManager.getIndex(datasetId, resourceId); + ILSMIndexAccessor indexAccessor = + index.createAccessor(NoOpOperationCallback.INSTANCE, NoOpOperationCallback.INSTANCE); + if (logRecord.getNewOp() == IndexOperation.INSERT.ordinal()) { + indexAccessor.forceInsert(logRecord.getNewValue()); + } else if (logRecord.getNewOp() == IndexOperation.DELETE.ordinal()) { + indexAccessor.forceDelete(logRecord.getNewValue()); + } else { + throw new IllegalStateException("Unsupported OperationType: " + logRecord.getNewOp()); + } + } catch (Exception e) { + throw new IllegalStateException("Failed to redo", e); + } + } + + private class JobEntityCommits { + private static final String PARTITION_FILE_NAME_SEPARATOR = "_"; + private final int jobId; + private final Set<TxnId> cachedEntityCommitTxns = new HashSet<>(); + private final List<File> jobEntitCommitOnDiskPartitionsFiles = new ArrayList<>(); + //a flag indicating whether all the the commits for this jobs have been added. + private boolean preparedForSearch = false; + private TxnId winnerEntity = null; + private int currentPartitionSize = 0; + private long partitionMaxLSN = 0; + private String currentPartitonName; + + public JobEntityCommits(int jobId) { + this.jobId = jobId; + } + + public void add(ILogRecord logRecord) throws IOException { + if (preparedForSearch) { + throw new IOException("Cannot add new entity commits after preparing for search."); + } + winnerEntity = new TxnId(logRecord.getJobId(), logRecord.getDatasetId(), logRecord.getPKHashValue(), + logRecord.getPKValue(), logRecord.getPKValueSize(), true); + cachedEntityCommitTxns.add(winnerEntity); + //since log file is read sequentially, LSNs are always increasing + partitionMaxLSN = logRecord.getLSN(); + currentPartitionSize += winnerEntity.getCurrentSize(); + //if the memory budget for the current partition exceeded the limit, spill it to disk and free memory + if (currentPartitionSize >= cachedEntityCommitsPerJobSize) { + spillToDiskAndfreeMemory(); + } + } + + public void spillToDiskAndfreeMemory() throws IOException { + if (cachedEntityCommitTxns.size() > 0) { + if (!preparedForSearch) { + writeCurrentPartitionToDisk(); + } + cachedEntityCommitTxns.clear(); + partitionMaxLSN = 0; + currentPartitionSize = 0; + currentPartitonName = ""; + } + } + + /** + * Call this method when no more entity commits will be added to this job. + * + * @throws IOException + */ + public void prepareForSearch() throws IOException { + //if we have anything left in memory, we need to spill them to disk before searching other partitions. + //However, if we don't have anything on disk, we will search from memory only + if (jobEntitCommitOnDiskPartitionsFiles.size() > 0) { + spillToDiskAndfreeMemory(); + } else { + //set the name of the current in memory partition to the current partition + currentPartitonName = getPartitionName(partitionMaxLSN); + } + preparedForSearch = true; + } + + public boolean containsEntityCommitForTxnId(long logLSN, TxnId txnId) throws IOException { + //if we don't have any partitions on disk, search only from memory + if (jobEntitCommitOnDiskPartitionsFiles.size() == 0) { + return cachedEntityCommitTxns.contains(txnId); + } else { + //get candidate partitions from disk + ArrayList<File> candidatePartitions = getCandidiatePartitions(logLSN); + for (File partition : candidatePartitions) { + if (serachPartition(partition, txnId)) { + return true; + } + } + } + return false; + } + + /** + * @param logLSN + * @return partitions that have a max LSN > logLSN + */ + public ArrayList<File> getCandidiatePartitions(long logLSN) { + ArrayList<File> candidiatePartitions = new ArrayList<>(); + for (File partition : jobEntitCommitOnDiskPartitionsFiles) { + String partitionName = partition.getName(); + // entity commit log must come after the update log, + // therefore, consider only partitions with max LSN > logLSN + if (getPartitionMaxLSNFromName(partitionName) > logLSN) { + candidiatePartitions.add(partition); + } + } + + return candidiatePartitions; + } + + public void clear() { + cachedEntityCommitTxns.clear(); + for (File partition : jobEntitCommitOnDiskPartitionsFiles) { + partition.delete(); + } + jobEntitCommitOnDiskPartitionsFiles.clear(); + } + + private boolean serachPartition(File partition, TxnId txnId) throws IOException { + //load partition from disk if it is not already in memory + if (!partition.getName().equals(currentPartitonName)) { + loadPartitionToMemory(partition, cachedEntityCommitTxns); + currentPartitonName = partition.getName(); + } + return cachedEntityCommitTxns.contains(txnId); + } + + private String getPartitionName(long maxLSN) { + return jobId + PARTITION_FILE_NAME_SEPARATOR + maxLSN; + } + + private long getPartitionMaxLSNFromName(String partitionName) { + return Long.valueOf(partitionName.substring(partitionName.indexOf(PARTITION_FILE_NAME_SEPARATOR) + 1)); + } + + private void writeCurrentPartitionToDisk() throws IOException { + //if we don't have enough memory to allocate for this partition, + // we will ask recovery manager to free memory + if (needToFreeMemory()) { + freeJobsCachedEntities(jobId); + } + //allocate a buffer that can hold the current partition + ByteBuffer buffer = ByteBuffer.allocate(currentPartitionSize); + for (Iterator<TxnId> iterator = cachedEntityCommitTxns.iterator(); iterator.hasNext();) { + TxnId txnId = iterator.next(); + //serialize the object and remove it from memory + txnId.serialize(buffer); + iterator.remove(); + } + //name partition file based on job id and max lsn + File partitionFile = createJobRecoveryFile(jobId, getPartitionName(partitionMaxLSN)); + //write file to disk + try (FileOutputStream fileOutputstream = new FileOutputStream(partitionFile, false); + FileChannel fileChannel = fileOutputstream.getChannel()) { + buffer.flip(); + while (buffer.hasRemaining()) { + fileChannel.write(buffer); + } + } + jobEntitCommitOnDiskPartitionsFiles.add(partitionFile); + } + + private void loadPartitionToMemory(File partition, Set<TxnId> partitionTxn) throws IOException { + partitionTxn.clear(); + //if we don't have enough memory to a load partition, we will ask recovery manager to free memory + if (needToFreeMemory()) { + freeJobsCachedEntities(jobId); + } + ByteBuffer buffer = ByteBuffer.allocateDirect((int) partition.length()); + //load partition to memory + try (InputStream is = new FileInputStream(partition)) { + int readByte; + while ((readByte = is.read()) != -1) { + buffer.put((byte) readByte); + } + } + buffer.flip(); + TxnId temp = null; + while (buffer.remaining() != 0) { + temp = TxnId.deserialize(buffer); + partitionTxn.add(temp); + } + } + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java new file mode 100644 index 0000000..808a252 --- /dev/null +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.app.nc; + +import java.util.concurrent.Callable; +import java.util.logging.Logger; + +import org.apache.asterix.common.config.ClusterProperties; +import org.apache.asterix.common.config.IPropertiesProvider; +import org.apache.asterix.common.config.ReplicationProperties; +import org.apache.asterix.common.config.TransactionProperties; +import org.apache.asterix.common.exceptions.ACIDException; +import org.apache.asterix.common.transactions.Checkpoint; +import org.apache.asterix.common.transactions.CheckpointProperties; +import org.apache.asterix.common.transactions.IAppRuntimeContextProvider; +import org.apache.asterix.common.transactions.ICheckpointManager; +import org.apache.asterix.common.transactions.ILockManager; +import org.apache.asterix.common.transactions.ILogManager; +import org.apache.asterix.common.transactions.IRecoveryManager; +import org.apache.asterix.common.transactions.ITransactionManager; +import org.apache.asterix.common.transactions.ITransactionSubsystem; +import org.apache.asterix.common.utils.StorageConstants; +import org.apache.asterix.common.utils.TransactionUtil; +import org.apache.asterix.transaction.management.service.locking.ConcurrentLockManager; +import org.apache.asterix.transaction.management.service.logging.LogManager; +import org.apache.asterix.transaction.management.service.logging.LogManagerWithReplication; +import org.apache.asterix.transaction.management.service.recovery.CheckpointManagerFactory; +import org.apache.asterix.transaction.management.service.transaction.TransactionManager; +import org.apache.hyracks.api.application.INCApplicationContext; + +/** + * Provider for all the sub-systems (transaction/lock/log/recovery) managers. + * Users of transaction sub-systems must obtain them from the provider. + */ +public class TransactionSubsystem implements ITransactionSubsystem { + private final String id; + private final ILogManager logManager; + private final ILockManager lockManager; + private final ITransactionManager transactionManager; + private final IRecoveryManager recoveryManager; + private final IAppRuntimeContextProvider asterixAppRuntimeContextProvider; + private final TransactionProperties txnProperties; + private final ICheckpointManager checkpointManager; + + //for profiling purpose + private long profilerEntityCommitLogCount = 0; + private EntityCommitProfiler ecp; + + public TransactionSubsystem(INCApplicationContext appCtx, String id, + IAppRuntimeContextProvider asterixAppRuntimeContextProvider, TransactionProperties txnProperties) + throws ACIDException { + this.asterixAppRuntimeContextProvider = asterixAppRuntimeContextProvider; + this.id = id; + this.txnProperties = txnProperties; + this.transactionManager = new TransactionManager(this); + this.lockManager = new ConcurrentLockManager(txnProperties.getLockManagerShrinkTimer()); + final boolean replicationEnabled = ClusterProperties.INSTANCE.isReplicationEnabled(); + final CheckpointProperties checkpointProperties = new CheckpointProperties(txnProperties, id); + checkpointManager = CheckpointManagerFactory.create(this, checkpointProperties, replicationEnabled); + final Checkpoint latestCheckpoint = checkpointManager.getLatest(); + if (latestCheckpoint != null && latestCheckpoint.getStorageVersion() != StorageConstants.VERSION) { + throw new IllegalStateException( + String.format("Storage version mismatch. Current version (%s). On disk version: (%s)", + latestCheckpoint.getStorageVersion(), StorageConstants.VERSION)); + } + + ReplicationProperties asterixReplicationProperties = null; + if (asterixAppRuntimeContextProvider != null) { + asterixReplicationProperties = ((IPropertiesProvider) asterixAppRuntimeContextProvider.getAppContext()) + .getReplicationProperties(); + } + + if (asterixReplicationProperties != null && replicationEnabled) { + this.logManager = new LogManagerWithReplication(this); + } else { + this.logManager = new LogManager(this); + } + this.recoveryManager = new RecoveryManager(this, appCtx); + + if (TransactionUtil.PROFILE_MODE) { + ecp = new EntityCommitProfiler(this, this.txnProperties.getCommitProfilerReportInterval()); + getAsterixAppRuntimeContextProvider().getThreadExecutor().submit(ecp); + } + } + + @Override + public ILogManager getLogManager() { + return logManager; + } + + @Override + public ILockManager getLockManager() { + return lockManager; + } + + @Override + public ITransactionManager getTransactionManager() { + return transactionManager; + } + + @Override + public IRecoveryManager getRecoveryManager() { + return recoveryManager; + } + + @Override + public IAppRuntimeContextProvider getAsterixAppRuntimeContextProvider() { + return asterixAppRuntimeContextProvider; + } + + @Override + public TransactionProperties getTransactionProperties() { + return txnProperties; + } + + @Override + public String getId() { + return id; + } + + @Override + public void incrementEntityCommitCount() { + ++profilerEntityCommitLogCount; + } + + @Override + public ICheckpointManager getCheckpointManager() { + return checkpointManager; + } + + /** + * Thread for profiling entity level commit count + * This thread takes a report interval (in seconds) parameter and + * reports entity level commit count every report interval (in seconds) + * only if IS_PROFILE_MODE is set to true. + * However, the thread doesn't start reporting the count until the entityCommitCount > 0. + */ + static class EntityCommitProfiler implements Callable<Boolean> { + private static final Logger LOGGER = Logger.getLogger(EntityCommitProfiler.class.getName()); + private final long reportIntervalInMillisec; + private long lastEntityCommitCount; + private int reportIntervalInSeconds; + private TransactionSubsystem txnSubsystem; + private boolean firstReport = true; + private long startTimeStamp = 0; + private long reportRound = 1; + + public EntityCommitProfiler(TransactionSubsystem txnSubsystem, int reportIntervalInSeconds) { + Thread.currentThread().setName("EntityCommitProfiler-Thread"); + this.txnSubsystem = txnSubsystem; + this.reportIntervalInSeconds = reportIntervalInSeconds; + this.reportIntervalInMillisec = reportIntervalInSeconds * 1000L; + lastEntityCommitCount = txnSubsystem.profilerEntityCommitLogCount; + } + + @Override + public Boolean call() throws Exception { + while (true) { + Thread.sleep(reportIntervalInMillisec); + if (txnSubsystem.profilerEntityCommitLogCount > 0) { + if (firstReport) { + startTimeStamp = System.currentTimeMillis(); + firstReport = false; + } + outputCount(); + } + } + } + + private void outputCount() { + long currentTimeStamp = System.currentTimeMillis(); + long currentEntityCommitCount = txnSubsystem.profilerEntityCommitLogCount; + + LOGGER.severe("EntityCommitProfiler ReportRound[" + reportRound + "], AbsoluteTimeStamp[" + + currentTimeStamp + "], ActualRelativeTimeStamp[" + (currentTimeStamp - startTimeStamp) + + "], ExpectedRelativeTimeStamp[" + (reportIntervalInSeconds * reportRound) + "], IIPS[" + + ((currentEntityCommitCount - lastEntityCommitCount) / reportIntervalInSeconds) + "], IPS[" + + (currentEntityCommitCount / (reportRound * reportIntervalInSeconds)) + "]"); + + lastEntityCommitCount = currentEntityCommitCount; + ++reportRound; + } + } + +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultPrinter.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultPrinter.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultPrinter.java index 9b1a3db..f401576 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultPrinter.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultPrinter.java @@ -27,6 +27,7 @@ import java.nio.ByteBuffer; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; import com.fasterxml.jackson.databind.node.ObjectNode; + import org.apache.asterix.common.utils.JSONUtil; import org.apache.asterix.om.types.ARecordType; import org.apache.asterix.translator.IStatementExecutor.Stats; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultReader.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultReader.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultReader.java index bffaf1f..b0677d8 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultReader.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/result/ResultReader.java @@ -18,7 +18,7 @@ */ package org.apache.asterix.app.result; -import org.apache.asterix.runtime.util.AppContextInfo; +import org.apache.asterix.runtime.utils.AppContextInfo; import org.apache.hyracks.api.comm.IFrame; import org.apache.hyracks.api.comm.IFrameTupleAccessor; import org.apache.hyracks.api.dataset.DatasetJobRecord.Status; http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/DefaultStatementExecutorFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/DefaultStatementExecutorFactory.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/DefaultStatementExecutorFactory.java index 6cdf329..99dcc83 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/DefaultStatementExecutorFactory.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/DefaultStatementExecutorFactory.java @@ -20,17 +20,18 @@ package org.apache.asterix.app.translator; import java.util.List; +import org.apache.asterix.common.context.IStorageComponentProvider; import org.apache.asterix.compiler.provider.ILangCompilationProvider; import org.apache.asterix.lang.common.base.Statement; +import org.apache.asterix.translator.IStatementExecutor; import org.apache.asterix.translator.IStatementExecutorFactory; import org.apache.asterix.translator.SessionConfig; public class DefaultStatementExecutorFactory implements IStatementExecutorFactory { @Override - public QueryTranslator create(List<Statement> aqlStatements, SessionConfig conf, - ILangCompilationProvider compilationProvider) { - return new QueryTranslator(aqlStatements, conf, compilationProvider); + public IStatementExecutor create(List<Statement> statements, SessionConfig conf, + ILangCompilationProvider compilationProvider, IStorageComponentProvider storageComponentProvider) { + return new QueryTranslator(statements, conf, compilationProvider, storageComponentProvider); } - }
