http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java deleted file mode 100644 index f8b6384..0000000 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java +++ /dev/null @@ -1,863 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.transaction.management.service.recovery; - -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.nio.ByteBuffer; -import java.nio.channels.FileChannel; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.logging.Level; -import java.util.logging.Logger; - -import org.apache.asterix.common.api.IDatasetLifecycleManager; -import org.apache.asterix.common.config.ClusterProperties; -import org.apache.asterix.common.exceptions.ACIDException; -import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback; -import org.apache.asterix.common.replication.IReplicaResourcesManager; -import org.apache.asterix.common.transactions.Checkpoint; -import org.apache.asterix.common.transactions.IAppRuntimeContextProvider; -import org.apache.asterix.common.transactions.ICheckpointManager; -import org.apache.asterix.common.transactions.ILogReader; -import org.apache.asterix.common.transactions.ILogRecord; -import org.apache.asterix.common.transactions.IRecoveryManager; -import org.apache.asterix.common.transactions.ITransactionContext; -import org.apache.asterix.common.transactions.LogType; -import org.apache.asterix.common.transactions.Resource; -import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository; -import org.apache.asterix.transaction.management.service.logging.LogManager; -import org.apache.asterix.transaction.management.service.transaction.TransactionManagementConstants; -import org.apache.asterix.transaction.management.service.transaction.TransactionSubsystem; -import org.apache.commons.io.FileUtils; -import org.apache.hyracks.api.exceptions.HyracksDataException; -import org.apache.hyracks.api.lifecycle.ILifeCycleComponent; -import org.apache.hyracks.storage.am.common.api.IIndex; -import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback; -import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor; -import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex; -import org.apache.hyracks.storage.common.file.LocalResource; - -/** - * This is the Recovery Manager and is responsible for rolling back a - * transaction as well as doing a system recovery. - */ -public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { - - public static final boolean IS_DEBUG_MODE = false; - private static final Logger LOGGER = Logger.getLogger(RecoveryManager.class.getName()); - private final TransactionSubsystem txnSubsystem; - private final LogManager logMgr; - private final boolean replicationEnabled; - private static final String RECOVERY_FILES_DIR_NAME = "recovery_temp"; - private Map<Integer, JobEntityCommits> jobId2WinnerEntitiesMap = null; - private final long cachedEntityCommitsPerJobSize; - private final PersistentLocalResourceRepository localResourceRepository; - private final ICheckpointManager checkpointManager; - private SystemState state; - - public RecoveryManager(TransactionSubsystem txnSubsystem) { - this.txnSubsystem = txnSubsystem; - logMgr = (LogManager) txnSubsystem.getLogManager(); - replicationEnabled = ClusterProperties.INSTANCE.isReplicationEnabled(); - localResourceRepository = (PersistentLocalResourceRepository) txnSubsystem.getAsterixAppRuntimeContextProvider() - .getLocalResourceRepository(); - cachedEntityCommitsPerJobSize = txnSubsystem.getTransactionProperties().getJobRecoveryMemorySize(); - checkpointManager = txnSubsystem.getCheckpointManager(); - } - - /** - * returns system state which could be one of the three states: HEALTHY, RECOVERING, CORRUPTED. - * This state information could be used in a case where more than one thread is running - * in the bootstrap process to provide higher availability. In other words, while the system - * is recovered, another thread may start a new transaction with understanding the side effect - * of the operation, or the system can be recovered concurrently. This kind of concurrency is - * not supported, yet. - */ - @Override - public SystemState getSystemState() throws ACIDException { - //read checkpoint file - Checkpoint checkpointObject = checkpointManager.getLatest(); - if (checkpointObject == null) { - //The checkpoint file doesn't exist => Failure happened during NC initialization. - //Retry to initialize the NC by setting the state to NEW_UNIVERSE - state = SystemState.NEW_UNIVERSE; - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("The checkpoint file doesn't exist: systemState = NEW_UNIVERSE"); - } - return state; - } - - if (replicationEnabled) { - if (checkpointObject.getMinMCTFirstLsn() == AbstractCheckpointManager.SHARP_CHECKPOINT_LSN) { - //no logs exist - state = SystemState.HEALTHY; - return state; - } else if (checkpointObject.getCheckpointLsn() == logMgr.getAppendLSN() && checkpointObject.isSharp()) { - //only remote logs exist - state = SystemState.HEALTHY; - return state; - } else { - //need to perform remote recovery - state = SystemState.CORRUPTED; - return state; - } - } else { - long readableSmallestLSN = logMgr.getReadableSmallestLSN(); - if (logMgr.getAppendLSN() == readableSmallestLSN) { - if (checkpointObject.getMinMCTFirstLsn() != AbstractCheckpointManager.SHARP_CHECKPOINT_LSN) { - LOGGER.warning("Some(or all) of transaction log files are lost."); - //No choice but continuing when the log files are lost. - } - state = SystemState.HEALTHY; - return state; - } else if (checkpointObject.getCheckpointLsn() == logMgr.getAppendLSN() - && checkpointObject.getMinMCTFirstLsn() == AbstractCheckpointManager.SHARP_CHECKPOINT_LSN) { - state = SystemState.HEALTHY; - return state; - } else { - state = SystemState.CORRUPTED; - return state; - } - } - } - - //This method is used only when replication is disabled. - @Override - public void startRecovery(boolean synchronous) throws IOException, ACIDException { - state = SystemState.RECOVERING; - LOGGER.log(Level.INFO, "starting recovery ..."); - - long readableSmallestLSN = logMgr.getReadableSmallestLSN(); - Checkpoint checkpointObject = checkpointManager.getLatest(); - long lowWaterMarkLSN = checkpointObject.getMinMCTFirstLsn(); - if (lowWaterMarkLSN < readableSmallestLSN) { - lowWaterMarkLSN = readableSmallestLSN; - } - - //delete any recovery files from previous failed recovery attempts - deleteRecoveryTemporaryFiles(); - - //get active partitions on this node - Set<Integer> activePartitions = localResourceRepository.getNodeOrignalPartitions(); - replayPartitionsLogs(activePartitions, logMgr.getLogReader(true), lowWaterMarkLSN); - } - - @Override - public synchronized void replayPartitionsLogs(Set<Integer> partitions, ILogReader logReader, long lowWaterMarkLSN) - throws IOException, ACIDException { - try { - Set<Integer> winnerJobSet = startRecoverysAnalysisPhase(partitions, logReader, lowWaterMarkLSN); - startRecoveryRedoPhase(partitions, logReader, lowWaterMarkLSN, winnerJobSet); - } finally { - logReader.close(); - deleteRecoveryTemporaryFiles(); - } - } - - private synchronized Set<Integer> startRecoverysAnalysisPhase(Set<Integer> partitions, ILogReader logReader, - long lowWaterMarkLSN) throws IOException, ACIDException { - int updateLogCount = 0; - int entityCommitLogCount = 0; - int jobCommitLogCount = 0; - int abortLogCount = 0; - int jobId = -1; - - Set<Integer> winnerJobSet = new HashSet<>(); - jobId2WinnerEntitiesMap = new HashMap<>(); - - //set log reader to the lowWaterMarkLsn - ILogRecord logRecord = null; - logReader.initializeScan(lowWaterMarkLSN); - - //collect all committed Lsn - JobEntityCommits jobEntityWinners = null; - - logRecord = logReader.next(); - while (logRecord != null) { - if (IS_DEBUG_MODE) { - LOGGER.info(logRecord.getLogRecordForDisplay()); - } - switch (logRecord.getLogType()) { - case LogType.UPDATE: - if (partitions.contains(logRecord.getResourcePartition())) { - updateLogCount++; - } - break; - case LogType.JOB_COMMIT: - jobId = logRecord.getJobId(); - winnerJobSet.add(jobId); - if (jobId2WinnerEntitiesMap.containsKey(jobId)) { - jobEntityWinners = jobId2WinnerEntitiesMap.get(jobId); - //to delete any spilled files as well - jobEntityWinners.clear(); - jobId2WinnerEntitiesMap.remove(jobId); - } - jobCommitLogCount++; - break; - case LogType.ENTITY_COMMIT: - case LogType.UPSERT_ENTITY_COMMIT: - if (partitions.contains(logRecord.getResourcePartition())) { - jobId = logRecord.getJobId(); - if (!jobId2WinnerEntitiesMap.containsKey(jobId)) { - jobEntityWinners = new JobEntityCommits(jobId); - if (needToFreeMemory()) { - //if we don't have enough memory for one more job, we will force all jobs to spill their cached entities to disk. - //This could happen only when we have many jobs with small number of records and none of them have job commit. - freeJobsCachedEntities(jobId); - } - jobId2WinnerEntitiesMap.put(jobId, jobEntityWinners); - } else { - jobEntityWinners = jobId2WinnerEntitiesMap.get(jobId); - } - jobEntityWinners.add(logRecord); - entityCommitLogCount++; - } - break; - case LogType.ABORT: - abortLogCount++; - break; - case LogType.FLUSH: - case LogType.WAIT: - case LogType.MARKER: - break; - default: - throw new ACIDException("Unsupported LogType: " + logRecord.getLogType()); - } - logRecord = logReader.next(); - } - - //prepare winners for search after analysis is done to flush anything remaining in memory to disk. - for (JobEntityCommits winners : jobId2WinnerEntitiesMap.values()) { - winners.prepareForSearch(); - } - - LOGGER.info("Logs analysis phase completed."); - LOGGER.info("Analysis log count update/entityCommit/jobCommit/abort = " + updateLogCount + "/" - + entityCommitLogCount + "/" + jobCommitLogCount + "/" + abortLogCount); - - return winnerJobSet; - } - - private synchronized void startRecoveryRedoPhase(Set<Integer> partitions, ILogReader logReader, - long lowWaterMarkLSN, Set<Integer> winnerJobSet) throws IOException, ACIDException { - int redoCount = 0; - int jobId = -1; - - long resourceId; - long maxDiskLastLsn; - long LSN = -1; - ILSMIndex index = null; - LocalResource localResource = null; - Resource localResourceMetadata = null; - boolean foundWinner = false; - JobEntityCommits jobEntityWinners = null; - - IAppRuntimeContextProvider appRuntimeContext = txnSubsystem.getAsterixAppRuntimeContextProvider(); - IDatasetLifecycleManager datasetLifecycleManager = appRuntimeContext.getDatasetLifecycleManager(); - - Map<Long, LocalResource> resourcesMap = localResourceRepository.loadAndGetAllResources(); - Map<Long, Long> resourceId2MaxLSNMap = new HashMap<>(); - TxnId tempKeyTxnId = new TxnId(-1, -1, -1, null, -1, false); - - ILogRecord logRecord = null; - try { - logReader.initializeScan(lowWaterMarkLSN); - logRecord = logReader.next(); - while (logRecord != null) { - if (IS_DEBUG_MODE) { - LOGGER.info(logRecord.getLogRecordForDisplay()); - } - LSN = logRecord.getLSN(); - jobId = logRecord.getJobId(); - foundWinner = false; - switch (logRecord.getLogType()) { - case LogType.UPDATE: - if (partitions.contains(logRecord.getResourcePartition())) { - if (winnerJobSet.contains(jobId)) { - foundWinner = true; - } else if (jobId2WinnerEntitiesMap.containsKey(jobId)) { - jobEntityWinners = jobId2WinnerEntitiesMap.get(jobId); - tempKeyTxnId.setTxnId(jobId, logRecord.getDatasetId(), logRecord.getPKHashValue(), - logRecord.getPKValue(), logRecord.getPKValueSize()); - if (jobEntityWinners.containsEntityCommitForTxnId(LSN, tempKeyTxnId)) { - foundWinner = true; - } - } - if (foundWinner) { - resourceId = logRecord.getResourceId(); - localResource = resourcesMap.get(resourceId); - /******************************************************************* - * [Notice] - * -> Issue - * Delete index may cause a problem during redo. - * The index operation to be redone couldn't be redone because the corresponding index - * may not exist in NC due to the possible index drop DDL operation. - * -> Approach - * Avoid the problem during redo. - * More specifically, the problem will be detected when the localResource of - * the corresponding index is retrieved, which will end up with 'null'. - * If null is returned, then just go and process the next - * log record. - *******************************************************************/ - if (localResource == null) { - logRecord = logReader.next(); - continue; - } - /*******************************************************************/ - - //get index instance from IndexLifeCycleManager - //if index is not registered into IndexLifeCycleManager, - //create the index using LocalMetadata stored in LocalResourceRepository - //get partition path in this node - localResourceMetadata = (Resource) localResource.getResource(); - index = (ILSMIndex) datasetLifecycleManager.get(localResource.getPath()); - if (index == null) { - //#. create index instance and register to indexLifeCycleManager - index = localResourceMetadata.createIndexInstance(appRuntimeContext, localResource); - datasetLifecycleManager.register(localResource.getPath(), index); - datasetLifecycleManager.open(localResource.getPath()); - - //#. get maxDiskLastLSN - ILSMIndex lsmIndex = index; - try { - maxDiskLastLsn = ((AbstractLSMIOOperationCallback) lsmIndex - .getIOOperationCallback()) - .getComponentLSN(lsmIndex.getImmutableComponents()); - } catch (HyracksDataException e) { - datasetLifecycleManager.close(localResource.getPath()); - throw e; - } - - //#. set resourceId and maxDiskLastLSN to the map - resourceId2MaxLSNMap.put(resourceId, maxDiskLastLsn); - } else { - maxDiskLastLsn = resourceId2MaxLSNMap.get(resourceId); - } - - if (LSN > maxDiskLastLsn) { - redo(logRecord, datasetLifecycleManager); - redoCount++; - } - } - } - break; - case LogType.JOB_COMMIT: - case LogType.ENTITY_COMMIT: - case LogType.ABORT: - case LogType.FLUSH: - case LogType.UPSERT_ENTITY_COMMIT: - case LogType.WAIT: - case LogType.MARKER: - //do nothing - break; - default: - throw new ACIDException("Unsupported LogType: " + logRecord.getLogType()); - } - logRecord = logReader.next(); - } - LOGGER.info("Logs REDO phase completed. Redo logs count: " + redoCount); - } finally { - //close all indexes - Set<Long> resourceIdList = resourceId2MaxLSNMap.keySet(); - for (long r : resourceIdList) { - datasetLifecycleManager.close(resourcesMap.get(r).getPath()); - } - } - } - - private boolean needToFreeMemory() { - return Runtime.getRuntime().freeMemory() < cachedEntityCommitsPerJobSize; - } - - @Override - public long getMinFirstLSN() throws HyracksDataException { - long minFirstLSN = getLocalMinFirstLSN(); - - //if replication is enabled, consider replica resources min LSN - if (replicationEnabled) { - long remoteMinFirstLSN = getRemoteMinFirstLSN(); - minFirstLSN = Math.min(minFirstLSN, remoteMinFirstLSN); - } - - return minFirstLSN; - } - - @Override - public long getLocalMinFirstLSN() throws HyracksDataException { - IDatasetLifecycleManager datasetLifecycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider() - .getDatasetLifecycleManager(); - List<IIndex> openIndexList = datasetLifecycleManager.getOpenResources(); - long firstLSN; - //the min first lsn can only be the current append or smaller - long minFirstLSN = logMgr.getAppendLSN(); - if (openIndexList.size() > 0) { - for (IIndex index : openIndexList) { - AbstractLSMIOOperationCallback ioCallback = (AbstractLSMIOOperationCallback) ((ILSMIndex) index) - .getIOOperationCallback(); - - if (!((AbstractLSMIndex) index).isCurrentMutableComponentEmpty() || ioCallback.hasPendingFlush()) { - firstLSN = ioCallback.getFirstLSN(); - minFirstLSN = Math.min(minFirstLSN, firstLSN); - } - } - } - return minFirstLSN; - } - - private long getRemoteMinFirstLSN() { - IReplicaResourcesManager remoteResourcesManager = txnSubsystem.getAsterixAppRuntimeContextProvider() - .getAppContext().getReplicaResourcesManager(); - long minRemoteLSN = remoteResourcesManager.getPartitionsMinLSN(localResourceRepository.getInactivePartitions()); - return minRemoteLSN; - } - - @Override - public File createJobRecoveryFile(int jobId, String fileName) throws IOException { - String recoveryDirPath = getRecoveryDirPath(); - Path JobRecoveryFolder = Paths.get(recoveryDirPath + File.separator + jobId); - if (!Files.exists(JobRecoveryFolder)) { - Files.createDirectories(JobRecoveryFolder); - } - - File jobRecoveryFile = new File(JobRecoveryFolder.toString() + File.separator + fileName); - if (!jobRecoveryFile.exists()) { - jobRecoveryFile.createNewFile(); - } else { - throw new IOException("File: " + fileName + " for job id(" + jobId + ") already exists"); - } - - return jobRecoveryFile; - } - - @Override - public void deleteRecoveryTemporaryFiles() { - String recoveryDirPath = getRecoveryDirPath(); - Path recoveryFolderPath = Paths.get(recoveryDirPath); - FileUtils.deleteQuietly(recoveryFolderPath.toFile()); - } - - private String getRecoveryDirPath() { - String logDir = logMgr.getLogManagerProperties().getLogDir(); - if (!logDir.endsWith(File.separator)) { - logDir += File.separator; - } - - return logDir + RECOVERY_FILES_DIR_NAME; - } - - private void freeJobsCachedEntities(int requestingJobId) throws IOException { - if (jobId2WinnerEntitiesMap != null) { - for (Entry<Integer, JobEntityCommits> jobEntityCommits : jobId2WinnerEntitiesMap.entrySet()) { - //if the job is not the requester, free its memory - if (jobEntityCommits.getKey() != requestingJobId) { - jobEntityCommits.getValue().spillToDiskAndfreeMemory(); - } - } - } - } - - @Override - public void rollbackTransaction(ITransactionContext txnContext) throws ACIDException { - int abortedJobId = txnContext.getJobId().getId(); - // Obtain the first/last log record LSNs written by the Job - long firstLSN = txnContext.getFirstLSN(); - /** - * The effect of any log record with LSN below minFirstLSN has already been written to disk and - * will not be rolled back. Therefore, we will set the first LSN of the job to the maximum of - * minFirstLSN and the job's first LSN. - */ - try { - long localMinFirstLSN = getLocalMinFirstLSN(); - firstLSN = Math.max(firstLSN, localMinFirstLSN); - } catch (HyracksDataException e) { - throw new ACIDException(e); - } - long lastLSN = txnContext.getLastLSN(); - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("rollbacking transaction log records from " + firstLSN + " to " + lastLSN); - } - // check if the transaction actually wrote some logs. - if (firstLSN == TransactionManagementConstants.LogManagerConstants.TERMINAL_LSN || firstLSN > lastLSN) { - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info( - "no need to roll back as there were no operations by the transaction " + txnContext.getJobId()); - } - return; - } - - // While reading log records from firstLsn to lastLsn, collect uncommitted txn's Lsns - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("collecting loser transaction's LSNs from " + firstLSN + " to " + lastLSN); - } - - Map<TxnId, List<Long>> jobLoserEntity2LSNsMap = new HashMap<>(); - TxnId tempKeyTxnId = new TxnId(-1, -1, -1, null, -1, false); - int updateLogCount = 0; - int entityCommitLogCount = 0; - int logJobId = -1; - long currentLSN = -1; - TxnId loserEntity = null; - List<Long> undoLSNSet = null; - //get active partitions on this node - Set<Integer> activePartitions = localResourceRepository.getActivePartitions(); - ILogReader logReader = logMgr.getLogReader(false); - try { - logReader.initializeScan(firstLSN); - ILogRecord logRecord = null; - while (currentLSN < lastLSN) { - logRecord = logReader.next(); - if (logRecord == null) { - break; - } else { - currentLSN = logRecord.getLSN(); - - if (IS_DEBUG_MODE) { - LOGGER.info(logRecord.getLogRecordForDisplay()); - } - } - logJobId = logRecord.getJobId(); - if (logJobId != abortedJobId) { - continue; - } - tempKeyTxnId.setTxnId(logJobId, logRecord.getDatasetId(), logRecord.getPKHashValue(), - logRecord.getPKValue(), logRecord.getPKValueSize()); - switch (logRecord.getLogType()) { - case LogType.UPDATE: - if (activePartitions.contains(logRecord.getResourcePartition())) { - undoLSNSet = jobLoserEntity2LSNsMap.get(tempKeyTxnId); - if (undoLSNSet == null) { - loserEntity = new TxnId(logJobId, logRecord.getDatasetId(), logRecord.getPKHashValue(), - logRecord.getPKValue(), logRecord.getPKValueSize(), true); - undoLSNSet = new LinkedList<>(); - jobLoserEntity2LSNsMap.put(loserEntity, undoLSNSet); - } - undoLSNSet.add(currentLSN); - updateLogCount++; - if (IS_DEBUG_MODE) { - LOGGER.info(Thread.currentThread().getId() + "======> update[" + currentLSN + "]:" - + tempKeyTxnId); - } - } - break; - case LogType.ENTITY_COMMIT: - case LogType.UPSERT_ENTITY_COMMIT: - if (activePartitions.contains(logRecord.getResourcePartition())) { - jobLoserEntity2LSNsMap.remove(tempKeyTxnId); - entityCommitLogCount++; - if (IS_DEBUG_MODE) { - LOGGER.info(Thread.currentThread().getId() + "======> entity_commit[" + currentLSN + "]" - + tempKeyTxnId); - } - } - break; - case LogType.JOB_COMMIT: - throw new ACIDException("Unexpected LogType(" + logRecord.getLogType() + ") during abort."); - case LogType.ABORT: - case LogType.FLUSH: - case LogType.WAIT: - case LogType.MARKER: - //ignore - break; - default: - throw new ACIDException("Unsupported LogType: " + logRecord.getLogType()); - } - } - - if (currentLSN != lastLSN) { - throw new ACIDException("LastLSN mismatch: lastLSN(" + lastLSN + ") vs currentLSN(" + currentLSN - + ") during abort( " + txnContext.getJobId() + ")"); - } - - //undo loserTxn's effect - LOGGER.log(Level.INFO, "undoing loser transaction's effect"); - - IDatasetLifecycleManager datasetLifecycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider() - .getDatasetLifecycleManager(); - //TODO sort loser entities by smallest LSN to undo in one pass. - Iterator<Entry<TxnId, List<Long>>> iter = jobLoserEntity2LSNsMap.entrySet().iterator(); - int undoCount = 0; - while (iter.hasNext()) { - Map.Entry<TxnId, List<Long>> loserEntity2LSNsMap = iter.next(); - undoLSNSet = loserEntity2LSNsMap.getValue(); - // The step below is important since the upsert operations must be done in reverse order. - Collections.reverse(undoLSNSet); - for (long undoLSN : undoLSNSet) { - //here, all the log records are UPDATE type. So, we don't need to check the type again. - //read the corresponding log record to be undone. - logRecord = logReader.read(undoLSN); - if (logRecord == null) { - throw new ACIDException("IllegalState exception during abort( " + txnContext.getJobId() + ")"); - } - if (IS_DEBUG_MODE) { - LOGGER.info(logRecord.getLogRecordForDisplay()); - } - undo(logRecord, datasetLifecycleManager); - undoCount++; - } - } - - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("undone loser transaction's effect"); - LOGGER.info("[RecoveryManager's rollback log count] update/entityCommit/undo:" + updateLogCount + "/" - + entityCommitLogCount + "/" + undoCount); - } - } finally { - logReader.close(); - } - } - - @Override - public void start() { - //no op - } - - @Override - public void stop(boolean dumpState, OutputStream os) throws IOException { - // Shutdown checkpoint - checkpointManager.doSharpCheckpoint(); - } - - @Override - public void dumpState(OutputStream os) throws IOException { - // do nothing - } - - private static void undo(ILogRecord logRecord, IDatasetLifecycleManager datasetLifecycleManager) { - try { - ILSMIndex index = (ILSMIndex) datasetLifecycleManager.getIndex(logRecord.getDatasetId(), - logRecord.getResourceId()); - ILSMIndexAccessor indexAccessor = index.createAccessor(NoOpOperationCallback.INSTANCE, - NoOpOperationCallback.INSTANCE); - if (logRecord.getNewOp() == IndexOperation.INSERT.ordinal()) { - indexAccessor.forceDelete(logRecord.getNewValue()); - } else if (logRecord.getNewOp() == IndexOperation.DELETE.ordinal()) { - indexAccessor.forceInsert(logRecord.getNewValue()); - } else { - throw new IllegalStateException("Unsupported OperationType: " + logRecord.getNewOp()); - } - } catch (Exception e) { - throw new IllegalStateException("Failed to undo", e); - } - } - - private static void redo(ILogRecord logRecord, IDatasetLifecycleManager datasetLifecycleManager) { - try { - int datasetId = logRecord.getDatasetId(); - long resourceId = logRecord.getResourceId(); - ILSMIndex index = (ILSMIndex) datasetLifecycleManager.getIndex(datasetId, resourceId); - ILSMIndexAccessor indexAccessor = index.createAccessor(NoOpOperationCallback.INSTANCE, - NoOpOperationCallback.INSTANCE); - if (logRecord.getNewOp() == IndexOperation.INSERT.ordinal()) { - indexAccessor.forceInsert(logRecord.getNewValue()); - } else if (logRecord.getNewOp() == IndexOperation.DELETE.ordinal()) { - indexAccessor.forceDelete(logRecord.getNewValue()); - } else { - throw new IllegalStateException("Unsupported OperationType: " + logRecord.getNewOp()); - } - } catch (Exception e) { - throw new IllegalStateException("Failed to redo", e); - } - } - - private class JobEntityCommits { - private static final String PARTITION_FILE_NAME_SEPARATOR = "_"; - private final int jobId; - private final Set<TxnId> cachedEntityCommitTxns = new HashSet<>(); - private final List<File> jobEntitCommitOnDiskPartitionsFiles = new ArrayList<>(); - //a flag indicating whether all the the commits for this jobs have been added. - private boolean preparedForSearch = false; - private TxnId winnerEntity = null; - private int currentPartitionSize = 0; - private long partitionMaxLSN = 0; - private String currentPartitonName; - - public JobEntityCommits(int jobId) { - this.jobId = jobId; - } - - public void add(ILogRecord logRecord) throws IOException { - if (preparedForSearch) { - throw new IOException("Cannot add new entity commits after preparing for search."); - } - winnerEntity = new TxnId(logRecord.getJobId(), logRecord.getDatasetId(), logRecord.getPKHashValue(), - logRecord.getPKValue(), logRecord.getPKValueSize(), true); - cachedEntityCommitTxns.add(winnerEntity); - //since log file is read sequentially, LSNs are always increasing - partitionMaxLSN = logRecord.getLSN(); - currentPartitionSize += winnerEntity.getCurrentSize(); - //if the memory budget for the current partition exceeded the limit, spill it to disk and free memory - if (currentPartitionSize >= cachedEntityCommitsPerJobSize) { - spillToDiskAndfreeMemory(); - } - } - - public void spillToDiskAndfreeMemory() throws IOException { - if (cachedEntityCommitTxns.size() > 0) { - if (!preparedForSearch) { - writeCurrentPartitionToDisk(); - } - cachedEntityCommitTxns.clear(); - partitionMaxLSN = 0; - currentPartitionSize = 0; - currentPartitonName = ""; - } - } - - /** - * Call this method when no more entity commits will be added to this job. - * - * @throws IOException - */ - public void prepareForSearch() throws IOException { - //if we have anything left in memory, we need to spill them to disk before searching other partitions. - //However, if we don't have anything on disk, we will search from memory only - if (jobEntitCommitOnDiskPartitionsFiles.size() > 0) { - spillToDiskAndfreeMemory(); - } else { - //set the name of the current in memory partition to the current partition - currentPartitonName = getPartitionName(partitionMaxLSN); - } - preparedForSearch = true; - } - - public boolean containsEntityCommitForTxnId(long logLSN, TxnId txnId) throws IOException { - //if we don't have any partitions on disk, search only from memory - if (jobEntitCommitOnDiskPartitionsFiles.size() == 0) { - return cachedEntityCommitTxns.contains(txnId); - } else { - //get candidate partitions from disk - ArrayList<File> candidatePartitions = getCandidiatePartitions(logLSN); - for (File partition : candidatePartitions) { - if (serachPartition(partition, txnId)) { - return true; - } - } - } - return false; - } - - /** - * @param logLSN - * @return partitions that have a max LSN > logLSN - */ - public ArrayList<File> getCandidiatePartitions(long logLSN) { - ArrayList<File> candidiatePartitions = new ArrayList<>(); - for (File partition : jobEntitCommitOnDiskPartitionsFiles) { - String partitionName = partition.getName(); - //entity commit log must come after the update log, therefore, consider only partitions with max LSN > logLSN - if (getPartitionMaxLSNFromName(partitionName) > logLSN) { - candidiatePartitions.add(partition); - } - } - - return candidiatePartitions; - } - - public void clear() { - cachedEntityCommitTxns.clear(); - for (File partition : jobEntitCommitOnDiskPartitionsFiles) { - partition.delete(); - } - jobEntitCommitOnDiskPartitionsFiles.clear(); - } - - private boolean serachPartition(File partition, TxnId txnId) throws IOException { - //load partition from disk if it is not already in memory - if (!partition.getName().equals(currentPartitonName)) { - loadPartitionToMemory(partition, cachedEntityCommitTxns); - currentPartitonName = partition.getName(); - } - return cachedEntityCommitTxns.contains(txnId); - } - - private String getPartitionName(long maxLSN) { - return jobId + PARTITION_FILE_NAME_SEPARATOR + maxLSN; - } - - private long getPartitionMaxLSNFromName(String partitionName) { - return Long.valueOf(partitionName.substring(partitionName.indexOf(PARTITION_FILE_NAME_SEPARATOR) + 1)); - } - - private void writeCurrentPartitionToDisk() throws IOException { - //if we don't have enough memory to allocate for this partition, we will ask recovery manager to free memory - if (needToFreeMemory()) { - freeJobsCachedEntities(jobId); - } - //allocate a buffer that can hold the current partition - ByteBuffer buffer = ByteBuffer.allocate(currentPartitionSize); - for (Iterator<TxnId> iterator = cachedEntityCommitTxns.iterator(); iterator.hasNext();) { - TxnId txnId = iterator.next(); - //serialize the object and remove it from memory - txnId.serialize(buffer); - iterator.remove(); - } - //name partition file based on job id and max lsn - File partitionFile = createJobRecoveryFile(jobId, getPartitionName(partitionMaxLSN)); - //write file to disk - try (FileOutputStream fileOutputstream = new FileOutputStream(partitionFile, false); - FileChannel fileChannel = fileOutputstream.getChannel()) { - buffer.flip(); - while (buffer.hasRemaining()) { - fileChannel.write(buffer); - } - } - jobEntitCommitOnDiskPartitionsFiles.add(partitionFile); - } - - private void loadPartitionToMemory(File partition, Set<TxnId> partitionTxn) throws IOException { - partitionTxn.clear(); - //if we don't have enough memory to a load partition, we will ask recovery manager to free memory - if (needToFreeMemory()) { - freeJobsCachedEntities(jobId); - } - ByteBuffer buffer = ByteBuffer.allocateDirect((int) partition.length()); - //load partition to memory - try (InputStream is = new FileInputStream(partition)) { - int readByte; - while ((readByte = is.read()) != -1) { - buffer.put((byte) readByte); - } - } - buffer.flip(); - TxnId temp = null; - while (buffer.remaining() != 0) { - temp = TxnId.deserialize(buffer); - partitionTxn.add(temp); - } - } - } -}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AsterixRuntimeComponentsProvider.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AsterixRuntimeComponentsProvider.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AsterixRuntimeComponentsProvider.java deleted file mode 100644 index 8c6e253..0000000 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AsterixRuntimeComponentsProvider.java +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.transaction.management.service.transaction; - -import org.apache.asterix.common.api.IAppRuntimeContext; -import org.apache.asterix.common.api.IDatasetLifecycleManager; -import org.apache.hyracks.api.context.IHyracksTaskContext; -import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler; -import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationSchedulerProvider; -import org.apache.hyracks.storage.common.IStorageManagerInterface; -import org.apache.hyracks.storage.common.buffercache.IBufferCache; -import org.apache.hyracks.storage.common.file.IFileMapProvider; -import org.apache.hyracks.storage.common.file.ILocalResourceRepository; -import org.apache.hyracks.storage.common.file.IResourceIdFactory; - -public class AsterixRuntimeComponentsProvider implements IIndexLifecycleManagerProvider, IStorageManagerInterface, - ILSMIOOperationSchedulerProvider { - - private static final long serialVersionUID = 1L; - - public static final AsterixRuntimeComponentsProvider RUNTIME_PROVIDER = new AsterixRuntimeComponentsProvider(); - - private AsterixRuntimeComponentsProvider() { - } - - @Override - public ILSMIOOperationScheduler getIOScheduler(IHyracksTaskContext ctx) { - return ((IAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject()) - .getLSMIOScheduler(); - } - - @Override - public IBufferCache getBufferCache(IHyracksTaskContext ctx) { - return ((IAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject()) - .getBufferCache(); - } - - @Override - public IFileMapProvider getFileMapProvider(IHyracksTaskContext ctx) { - return ((IAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject()) - .getFileMapManager(); - } - - @Override - public ILocalResourceRepository getLocalResourceRepository(IHyracksTaskContext ctx) { - return ((IAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject()) - .getLocalResourceRepository(); - } - - @Override - public IDatasetLifecycleManager getLifecycleManager(IHyracksTaskContext ctx) { - return ((IAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject()) - .getDatasetLifecycleManager(); - } - - @Override - public IResourceIdFactory getResourceIdFactory(IHyracksTaskContext ctx) { - return ((IAppRuntimeContext) ctx.getJobletContext().getApplicationContext().getApplicationObject()) - .getResourceIdFactory(); - } - -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContext.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContext.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContext.java index 215eb14..ad03a25 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContext.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContext.java @@ -98,7 +98,7 @@ public class TransactionContext implements ITransactionContext, Serializable { // creations. // also, the pool can throttle the number of concurrent active jobs at every // moment. - public TransactionContext(JobId jobId, TransactionSubsystem transactionSubsystem) throws ACIDException { + public TransactionContext(JobId jobId) throws ACIDException { this.jobId = jobId; firstLSN = new AtomicLong(-1); lastLSN = new AtomicLong(-1); @@ -106,7 +106,7 @@ public class TransactionContext implements ITransactionContext, Serializable { isTimeout = false; isWriteTxn = new AtomicBoolean(false); isMetadataTxn = false; - indexMap = new HashMap<MutableLong, AbstractLSMIOOperationCallback>(); + indexMap = new HashMap<>(); primaryIndex = null; tempResourceIdForRegister = new MutableLong(); logRecord = new LogRecord(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java index b08ecbb..3e79e1d 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java @@ -30,6 +30,7 @@ import org.apache.asterix.common.exceptions.ACIDException; import org.apache.asterix.common.transactions.DatasetId; import org.apache.asterix.common.transactions.ITransactionContext; import org.apache.asterix.common.transactions.ITransactionManager; +import org.apache.asterix.common.transactions.ITransactionSubsystem; import org.apache.asterix.common.transactions.JobId; import org.apache.asterix.common.transactions.LogRecord; import org.apache.asterix.common.utils.TransactionUtil; @@ -43,11 +44,11 @@ public class TransactionManager implements ITransactionManager, ILifeCycleCompon public static final boolean IS_DEBUG_MODE = false;//true private static final Logger LOGGER = Logger.getLogger(TransactionManager.class.getName()); - private final TransactionSubsystem txnSubsystem; - private Map<JobId, ITransactionContext> transactionContextRepository = new ConcurrentHashMap<JobId, ITransactionContext>(); + private final ITransactionSubsystem txnSubsystem; + private Map<JobId, ITransactionContext> transactionContextRepository = new ConcurrentHashMap<>(); private AtomicInteger maxJobId = new AtomicInteger(0); - public TransactionManager(TransactionSubsystem provider) { + public TransactionManager(ITransactionSubsystem provider) { this.txnSubsystem = provider; } @@ -91,7 +92,7 @@ public class TransactionManager implements ITransactionManager, ILifeCycleCompon synchronized (this) { txnCtx = transactionContextRepository.get(jobId); if (txnCtx == null) { - txnCtx = new TransactionContext(jobId, txnSubsystem); + txnCtx = new TransactionContext(jobId); transactionContextRepository.put(jobId, txnCtx); } } @@ -103,7 +104,8 @@ public class TransactionManager implements ITransactionManager, ILifeCycleCompon } @Override - public void commitTransaction(ITransactionContext txnCtx, DatasetId datasetId, int PKHashVal) throws ACIDException { + public void commitTransaction(ITransactionContext txnCtx, DatasetId datasetId, int PKHashVal) + throws ACIDException { //Only job-level commits call this method. try { if (txnCtx.isWriteTxn()) { @@ -134,7 +136,7 @@ public class TransactionManager implements ITransactionManager, ILifeCycleCompon } @Override - public TransactionSubsystem getTransactionProvider() { + public ITransactionSubsystem getTransactionSubsystem() { return txnSubsystem; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java deleted file mode 100644 index 09183fe..0000000 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java +++ /dev/null @@ -1,200 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.transaction.management.service.transaction; - -import java.util.concurrent.Callable; -import java.util.concurrent.Future; -import java.util.logging.Logger; - -import org.apache.asterix.common.config.ClusterProperties; -import org.apache.asterix.common.config.IPropertiesProvider; -import org.apache.asterix.common.config.ReplicationProperties; -import org.apache.asterix.common.config.TransactionProperties; -import org.apache.asterix.common.exceptions.ACIDException; -import org.apache.asterix.common.transactions.Checkpoint; -import org.apache.asterix.common.transactions.CheckpointProperties; -import org.apache.asterix.common.transactions.IAppRuntimeContextProvider; -import org.apache.asterix.common.transactions.ICheckpointManager; -import org.apache.asterix.common.transactions.ILockManager; -import org.apache.asterix.common.transactions.ILogManager; -import org.apache.asterix.common.transactions.IRecoveryManager; -import org.apache.asterix.common.transactions.ITransactionManager; -import org.apache.asterix.common.transactions.ITransactionSubsystem; -import org.apache.asterix.common.utils.StorageConstants; -import org.apache.asterix.transaction.management.service.locking.ConcurrentLockManager; -import org.apache.asterix.transaction.management.service.logging.LogManager; -import org.apache.asterix.transaction.management.service.logging.LogManagerWithReplication; -import org.apache.asterix.transaction.management.service.recovery.CheckpointManagerFactory; -import org.apache.asterix.transaction.management.service.recovery.RecoveryManager; - -/** - * Provider for all the sub-systems (transaction/lock/log/recovery) managers. - * Users of transaction sub-systems must obtain them from the provider. - */ -public class TransactionSubsystem implements ITransactionSubsystem { - private final String id; - private final ILogManager logManager; - private final ILockManager lockManager; - private final ITransactionManager transactionManager; - private final IRecoveryManager recoveryManager; - private final IAppRuntimeContextProvider asterixAppRuntimeContextProvider; - private final TransactionProperties txnProperties; - private final ICheckpointManager checkpointManager; - - //for profiling purpose - public static final boolean IS_PROFILE_MODE = false;//true - public long profilerEntityCommitLogCount = 0; - private EntityCommitProfiler ecp; - private Future<Object> fecp; - - public TransactionSubsystem(String id, IAppRuntimeContextProvider asterixAppRuntimeContextProvider, - TransactionProperties txnProperties) throws ACIDException { - this.asterixAppRuntimeContextProvider = asterixAppRuntimeContextProvider; - this.id = id; - this.txnProperties = txnProperties; - this.transactionManager = new TransactionManager(this); - this.lockManager = new ConcurrentLockManager(txnProperties.getLockManagerShrinkTimer()); - final boolean replicationEnabled = ClusterProperties.INSTANCE.isReplicationEnabled(); - final CheckpointProperties checkpointProperties = new CheckpointProperties(txnProperties, id); - checkpointManager = CheckpointManagerFactory.create(this, checkpointProperties, replicationEnabled); - final Checkpoint latestCheckpoint = checkpointManager.getLatest(); - if (latestCheckpoint != null && latestCheckpoint.getStorageVersion() != StorageConstants.VERSION) { - throw new IllegalStateException( - String.format("Storage version mismatch. Current version (%s). On disk version: (%s)", - latestCheckpoint.getStorageVersion(), StorageConstants.VERSION)); - } - - ReplicationProperties asterixReplicationProperties = null; - if (asterixAppRuntimeContextProvider != null) { - asterixReplicationProperties = ((IPropertiesProvider) asterixAppRuntimeContextProvider - .getAppContext()).getReplicationProperties(); - } - - if (asterixReplicationProperties != null && replicationEnabled) { - this.logManager = new LogManagerWithReplication(this); - } else { - this.logManager = new LogManager(this); - } - this.recoveryManager = new RecoveryManager(this); - - if (IS_PROFILE_MODE) { - ecp = new EntityCommitProfiler(this, this.txnProperties.getCommitProfilerReportInterval()); - fecp = (Future<Object>) getAsterixAppRuntimeContextProvider().getThreadExecutor().submit(ecp); - } - } - - @Override - public ILogManager getLogManager() { - return logManager; - } - - @Override - public ILockManager getLockManager() { - return lockManager; - } - - @Override - public ITransactionManager getTransactionManager() { - return transactionManager; - } - - @Override - public IRecoveryManager getRecoveryManager() { - return recoveryManager; - } - - @Override - public IAppRuntimeContextProvider getAsterixAppRuntimeContextProvider() { - return asterixAppRuntimeContextProvider; - } - - public TransactionProperties getTransactionProperties() { - return txnProperties; - } - - @Override - public String getId() { - return id; - } - - public void incrementEntityCommitCount() { - ++profilerEntityCommitLogCount; - } - - @Override - public ICheckpointManager getCheckpointManager() { - return checkpointManager; - } - - /** - * Thread for profiling entity level commit count - * This thread takes a report interval (in seconds) parameter and - * reports entity level commit count every report interval (in seconds) - * only if IS_PROFILE_MODE is set to true. - * However, the thread doesn't start reporting the count until the entityCommitCount > 0. - */ - static class EntityCommitProfiler implements Callable<Boolean> { - private static final Logger LOGGER = Logger.getLogger(EntityCommitProfiler.class.getName()); - private final long reportIntervalInMillisec; - private long lastEntityCommitCount; - private int reportIntervalInSeconds; - private TransactionSubsystem txnSubsystem; - private boolean firstReport = true; - private long startTimeStamp = 0; - private long reportRound = 1; - - public EntityCommitProfiler(TransactionSubsystem txnSubsystem, int reportIntervalInSeconds) { - Thread.currentThread().setName("EntityCommitProfiler-Thread"); - this.txnSubsystem = txnSubsystem; - this.reportIntervalInSeconds = reportIntervalInSeconds; - this.reportIntervalInMillisec = reportIntervalInSeconds * 1000; - lastEntityCommitCount = txnSubsystem.profilerEntityCommitLogCount; - } - - @Override - public Boolean call() throws Exception { - while (true) { - Thread.sleep(reportIntervalInMillisec); - if (txnSubsystem.profilerEntityCommitLogCount > 0) { - if (firstReport) { - startTimeStamp = System.currentTimeMillis(); - firstReport = false; - } - //output the count - outputCount(); - } - } - } - - private void outputCount() { - long currentTimeStamp = System.currentTimeMillis(); - long currentEntityCommitCount = txnSubsystem.profilerEntityCommitLogCount; - - LOGGER.severe("EntityCommitProfiler ReportRound[" + reportRound + "], AbsoluteTimeStamp[" + currentTimeStamp - + "], ActualRelativeTimeStamp[" + (currentTimeStamp - startTimeStamp) - + "], ExpectedRelativeTimeStamp[" + (reportIntervalInSeconds * reportRound) + "], IIPS[" - + ((currentEntityCommitCount - lastEntityCommitCount) / reportIntervalInSeconds) + "], IPS[" - + (currentEntityCommitCount / (reportRound * reportIntervalInSeconds)) + "]"); - - lastEntityCommitCount = currentEntityCommitCount; - ++reportRound; - } - } - -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/ExpressionRuntimeProvider.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/ExpressionRuntimeProvider.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/ExpressionRuntimeProvider.java new file mode 100644 index 0000000..31726d2 --- /dev/null +++ b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/ExpressionRuntimeProvider.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hyracks.algebricks.core.algebra.expressions; + +import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; +import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression; +import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema; +import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext; +import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluatorFactory; +import org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluatorFactory; +import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory; +import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; +import org.apache.hyracks.algebricks.runtime.base.IUnnestingEvaluatorFactory; + +public class ExpressionRuntimeProvider implements IExpressionRuntimeProvider { + private final ILogicalExpressionJobGen lejg; + + public ExpressionRuntimeProvider(ILogicalExpressionJobGen lejg) { + this.lejg = lejg; + } + + @Override + public IScalarEvaluatorFactory createEvaluatorFactory(ILogicalExpression expr, IVariableTypeEnvironment env, + IOperatorSchema[] inputSchemas, JobGenContext context) throws AlgebricksException { + return lejg.createEvaluatorFactory(expr, env, inputSchemas, context); + } + + @Override + public IAggregateEvaluatorFactory createAggregateFunctionFactory(AggregateFunctionCallExpression expr, + IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas, JobGenContext context) + throws AlgebricksException { + return lejg.createAggregateFunctionFactory(expr, env, inputSchemas, context); + } + + @Override + public ISerializedAggregateEvaluatorFactory createSerializableAggregateFunctionFactory( + AggregateFunctionCallExpression expr, IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas, + JobGenContext context) throws AlgebricksException { + return lejg.createSerializableAggregateFunctionFactory(expr, env, inputSchemas, context); + } + + @Override + public IRunningAggregateEvaluatorFactory createRunningAggregateFunctionFactory(StatefulFunctionCallExpression expr, + IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas, JobGenContext context) + throws AlgebricksException { + return lejg.createRunningAggregateFunctionFactory(expr, env, inputSchemas, context); + } + + @Override + public IUnnestingEvaluatorFactory createUnnestingFunctionFactory(UnnestingFunctionCallExpression expr, + IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas, JobGenContext context) + throws AlgebricksException { + return lejg.createUnnestingFunctionFactory(expr, env, inputSchemas, context); + } +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/LogicalExpressionJobGenToExpressionRuntimeProviderAdapter.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/LogicalExpressionJobGenToExpressionRuntimeProviderAdapter.java b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/LogicalExpressionJobGenToExpressionRuntimeProviderAdapter.java deleted file mode 100644 index 5b07a7e..0000000 --- a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/LogicalExpressionJobGenToExpressionRuntimeProviderAdapter.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.hyracks.algebricks.core.algebra.expressions; - -import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; -import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression; -import org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema; -import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext; -import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluatorFactory; -import org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluatorFactory; -import org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory; -import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory; -import org.apache.hyracks.algebricks.runtime.base.IUnnestingEvaluatorFactory; - -public class LogicalExpressionJobGenToExpressionRuntimeProviderAdapter implements IExpressionRuntimeProvider { - private final ILogicalExpressionJobGen lejg; - - public LogicalExpressionJobGenToExpressionRuntimeProviderAdapter(ILogicalExpressionJobGen lejg) { - this.lejg = lejg; - } - - @Override - public IScalarEvaluatorFactory createEvaluatorFactory(ILogicalExpression expr, IVariableTypeEnvironment env, - IOperatorSchema[] inputSchemas, JobGenContext context) throws AlgebricksException { - return lejg.createEvaluatorFactory(expr, env, inputSchemas, context); - } - - @Override - public IAggregateEvaluatorFactory createAggregateFunctionFactory(AggregateFunctionCallExpression expr, - IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas, JobGenContext context) - throws AlgebricksException { - return lejg.createAggregateFunctionFactory(expr, env, inputSchemas, context); - } - - @Override - public ISerializedAggregateEvaluatorFactory createSerializableAggregateFunctionFactory( - AggregateFunctionCallExpression expr, IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas, - JobGenContext context) throws AlgebricksException { - return lejg.createSerializableAggregateFunctionFactory(expr, env, inputSchemas, context); - } - - @Override - public IRunningAggregateEvaluatorFactory createRunningAggregateFunctionFactory(StatefulFunctionCallExpression expr, - IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas, JobGenContext context) - throws AlgebricksException { - return lejg.createRunningAggregateFunctionFactory(expr, env, inputSchemas, context); - } - - @Override - public IUnnestingEvaluatorFactory createUnnestingFunctionFactory(UnnestingFunctionCallExpression expr, - IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas, JobGenContext context) - throws AlgebricksException { - return lejg.createUnnestingFunctionFactory(expr, env, inputSchemas, context); - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/hyracks-fullstack/algebricks/algebricks-data/src/main/java/org/apache/hyracks/algebricks/data/IBinaryComparatorFactoryProvider.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-data/src/main/java/org/apache/hyracks/algebricks/data/IBinaryComparatorFactoryProvider.java b/hyracks-fullstack/algebricks/algebricks-data/src/main/java/org/apache/hyracks/algebricks/data/IBinaryComparatorFactoryProvider.java index bdeb018..0acb40a 100644 --- a/hyracks-fullstack/algebricks/algebricks-data/src/main/java/org/apache/hyracks/algebricks/data/IBinaryComparatorFactoryProvider.java +++ b/hyracks-fullstack/algebricks/algebricks-data/src/main/java/org/apache/hyracks/algebricks/data/IBinaryComparatorFactoryProvider.java @@ -21,7 +21,19 @@ package org.apache.hyracks.algebricks.data; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory; +/** + * Provides {@link org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory} for different types + */ +@FunctionalInterface public interface IBinaryComparatorFactoryProvider { - public IBinaryComparatorFactory getBinaryComparatorFactory(Object type, boolean ascending) - throws AlgebricksException; + /** + * @param type + * the type of the binary data + * @param ascending + * the order direction. true if ascending order is desired, false otherwise + * @return the appropriate {@link org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory} instance + * @throws AlgebricksException + * if the comparator factory for the passed type could not be created + */ + IBinaryComparatorFactory getBinaryComparatorFactory(Object type, boolean ascending) throws AlgebricksException; } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/hyracks-fullstack/algebricks/algebricks-data/src/main/java/org/apache/hyracks/algebricks/data/ITypeTraitProvider.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-data/src/main/java/org/apache/hyracks/algebricks/data/ITypeTraitProvider.java b/hyracks-fullstack/algebricks/algebricks-data/src/main/java/org/apache/hyracks/algebricks/data/ITypeTraitProvider.java index 3bac25d..1e502b3 100644 --- a/hyracks-fullstack/algebricks/algebricks-data/src/main/java/org/apache/hyracks/algebricks/data/ITypeTraitProvider.java +++ b/hyracks-fullstack/algebricks/algebricks-data/src/main/java/org/apache/hyracks/algebricks/data/ITypeTraitProvider.java @@ -20,6 +20,10 @@ package org.apache.hyracks.algebricks.data; import org.apache.hyracks.api.dataflow.value.ITypeTraits; +/** + * Provides {@link org.apache.hyracks.api.dataflow.value.ITypeTraits} for different data types + */ +@FunctionalInterface public interface ITypeTraitProvider { - public ITypeTraits getTypeTrait(Object type); + ITypeTraits getTypeTrait(Object type); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/hyracks-fullstack/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/compiler/PigletCompiler.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/compiler/PigletCompiler.java b/hyracks-fullstack/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/compiler/PigletCompiler.java index 0ac08c3..c8b6e59 100644 --- a/hyracks-fullstack/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/compiler/PigletCompiler.java +++ b/hyracks-fullstack/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/compiler/PigletCompiler.java @@ -44,7 +44,7 @@ import org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable; import org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression; import org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionTypeComputer; import org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment; -import org.apache.hyracks.algebricks.core.algebra.expressions.LogicalExpressionJobGenToExpressionRuntimeProviderAdapter; +import org.apache.hyracks.algebricks.core.algebra.expressions.ExpressionRuntimeProvider; import org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression; import org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression; import org.apache.hyracks.algebricks.core.algebra.functions.AlgebricksBuiltinFunctions; @@ -153,7 +153,7 @@ public class PigletCompiler { }); builder.setPrinterProvider(PigletPrinterFactoryProvider.INSTANCE); builder.setExpressionRuntimeProvider( - new LogicalExpressionJobGenToExpressionRuntimeProviderAdapter(new PigletExpressionJobGen())); + new ExpressionRuntimeProvider(new PigletExpressionJobGen())); builder.setExpressionTypeComputer(new IExpressionTypeComputer() { @Override public Object getType(ILogicalExpression expr, IMetadataProvider<?, ?> metadataProvider, http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java index 9463982..8f005d8 100644 --- a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java +++ b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java @@ -28,7 +28,7 @@ import org.apache.hyracks.api.context.IHyracksTaskContext; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.util.HyracksConstants; import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender; -import org.apache.hyracks.dataflow.common.util.TaskUtils; +import org.apache.hyracks.dataflow.common.utils.TaskUtil; import org.apache.hyracks.util.IntSerDeUtils; /** @@ -112,7 +112,7 @@ public class MessagingFrameTupleAppender extends FrameTupleAppender { @Override public void write(IFrameWriter outWriter, boolean clearFrame) throws HyracksDataException { if (!initialized) { - message = TaskUtils.<VSizeFrame> get(HyracksConstants.KEY_MESSAGE, ctx); + message = TaskUtil.<VSizeFrame> get(HyracksConstants.KEY_MESSAGE, ctx); initialized = true; } // If message fits, we append it, otherwise, we append a null message, then send a message only
