http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
deleted file mode 100644
index f8b6384..0000000
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/RecoveryManager.java
+++ /dev/null
@@ -1,863 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.transaction.management.service.recovery;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.logging.Level;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.api.IDatasetLifecycleManager;
-import org.apache.asterix.common.config.ClusterProperties;
-import org.apache.asterix.common.exceptions.ACIDException;
-import org.apache.asterix.common.ioopcallbacks.AbstractLSMIOOperationCallback;
-import org.apache.asterix.common.replication.IReplicaResourcesManager;
-import org.apache.asterix.common.transactions.Checkpoint;
-import org.apache.asterix.common.transactions.IAppRuntimeContextProvider;
-import org.apache.asterix.common.transactions.ICheckpointManager;
-import org.apache.asterix.common.transactions.ILogReader;
-import org.apache.asterix.common.transactions.ILogRecord;
-import org.apache.asterix.common.transactions.IRecoveryManager;
-import org.apache.asterix.common.transactions.ITransactionContext;
-import org.apache.asterix.common.transactions.LogType;
-import org.apache.asterix.common.transactions.Resource;
-import 
org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
-import org.apache.asterix.transaction.management.service.logging.LogManager;
-import 
org.apache.asterix.transaction.management.service.transaction.TransactionManagementConstants;
-import 
org.apache.asterix.transaction.management.service.transaction.TransactionSubsystem;
-import org.apache.commons.io.FileUtils;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.hyracks.api.lifecycle.ILifeCycleComponent;
-import org.apache.hyracks.storage.am.common.api.IIndex;
-import org.apache.hyracks.storage.am.common.impls.NoOpOperationCallback;
-import org.apache.hyracks.storage.am.common.ophelpers.IndexOperation;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndex;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMIndexAccessor;
-import org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndex;
-import org.apache.hyracks.storage.common.file.LocalResource;
-
-/**
- * This is the Recovery Manager and is responsible for rolling back a
- * transaction as well as doing a system recovery.
- */
-public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent {
-
-    public static final boolean IS_DEBUG_MODE = false;
-    private static final Logger LOGGER = 
Logger.getLogger(RecoveryManager.class.getName());
-    private final TransactionSubsystem txnSubsystem;
-    private final LogManager logMgr;
-    private final boolean replicationEnabled;
-    private static final String RECOVERY_FILES_DIR_NAME = "recovery_temp";
-    private Map<Integer, JobEntityCommits> jobId2WinnerEntitiesMap = null;
-    private final long cachedEntityCommitsPerJobSize;
-    private final PersistentLocalResourceRepository localResourceRepository;
-    private final ICheckpointManager checkpointManager;
-    private SystemState state;
-
-    public RecoveryManager(TransactionSubsystem txnSubsystem) {
-        this.txnSubsystem = txnSubsystem;
-        logMgr = (LogManager) txnSubsystem.getLogManager();
-        replicationEnabled = ClusterProperties.INSTANCE.isReplicationEnabled();
-        localResourceRepository = (PersistentLocalResourceRepository) 
txnSubsystem.getAsterixAppRuntimeContextProvider()
-                .getLocalResourceRepository();
-        cachedEntityCommitsPerJobSize = 
txnSubsystem.getTransactionProperties().getJobRecoveryMemorySize();
-        checkpointManager = txnSubsystem.getCheckpointManager();
-    }
-
-    /**
-     * returns system state which could be one of the three states: HEALTHY, 
RECOVERING, CORRUPTED.
-     * This state information could be used in a case where more than one 
thread is running
-     * in the bootstrap process to provide higher availability. In other 
words, while the system
-     * is recovered, another thread may start a new transaction with 
understanding the side effect
-     * of the operation, or the system can be recovered concurrently. This 
kind of concurrency is
-     * not supported, yet.
-     */
-    @Override
-    public SystemState getSystemState() throws ACIDException {
-        //read checkpoint file
-        Checkpoint checkpointObject = checkpointManager.getLatest();
-        if (checkpointObject == null) {
-            //The checkpoint file doesn't exist => Failure happened during NC 
initialization.
-            //Retry to initialize the NC by setting the state to NEW_UNIVERSE
-            state = SystemState.NEW_UNIVERSE;
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("The checkpoint file doesn't exist: systemState = 
NEW_UNIVERSE");
-            }
-            return state;
-        }
-
-        if (replicationEnabled) {
-            if (checkpointObject.getMinMCTFirstLsn() == 
AbstractCheckpointManager.SHARP_CHECKPOINT_LSN) {
-                //no logs exist
-                state = SystemState.HEALTHY;
-                return state;
-            } else if (checkpointObject.getCheckpointLsn() == 
logMgr.getAppendLSN() && checkpointObject.isSharp()) {
-                //only remote logs exist
-                state = SystemState.HEALTHY;
-                return state;
-            } else {
-                //need to perform remote recovery
-                state = SystemState.CORRUPTED;
-                return state;
-            }
-        } else {
-            long readableSmallestLSN = logMgr.getReadableSmallestLSN();
-            if (logMgr.getAppendLSN() == readableSmallestLSN) {
-                if (checkpointObject.getMinMCTFirstLsn() != 
AbstractCheckpointManager.SHARP_CHECKPOINT_LSN) {
-                    LOGGER.warning("Some(or all) of transaction log files are 
lost.");
-                    //No choice but continuing when the log files are lost.
-                }
-                state = SystemState.HEALTHY;
-                return state;
-            } else if (checkpointObject.getCheckpointLsn() == 
logMgr.getAppendLSN()
-                    && checkpointObject.getMinMCTFirstLsn() == 
AbstractCheckpointManager.SHARP_CHECKPOINT_LSN) {
-                state = SystemState.HEALTHY;
-                return state;
-            } else {
-                state = SystemState.CORRUPTED;
-                return state;
-            }
-        }
-    }
-
-    //This method is used only when replication is disabled.
-    @Override
-    public void startRecovery(boolean synchronous) throws IOException, 
ACIDException {
-        state = SystemState.RECOVERING;
-        LOGGER.log(Level.INFO, "starting recovery ...");
-
-        long readableSmallestLSN = logMgr.getReadableSmallestLSN();
-        Checkpoint checkpointObject = checkpointManager.getLatest();
-        long lowWaterMarkLSN = checkpointObject.getMinMCTFirstLsn();
-        if (lowWaterMarkLSN < readableSmallestLSN) {
-            lowWaterMarkLSN = readableSmallestLSN;
-        }
-
-        //delete any recovery files from previous failed recovery attempts
-        deleteRecoveryTemporaryFiles();
-
-        //get active partitions on this node
-        Set<Integer> activePartitions = 
localResourceRepository.getNodeOrignalPartitions();
-        replayPartitionsLogs(activePartitions, logMgr.getLogReader(true), 
lowWaterMarkLSN);
-    }
-
-    @Override
-    public synchronized void replayPartitionsLogs(Set<Integer> partitions, 
ILogReader logReader, long lowWaterMarkLSN)
-            throws IOException, ACIDException {
-        try {
-            Set<Integer> winnerJobSet = 
startRecoverysAnalysisPhase(partitions, logReader, lowWaterMarkLSN);
-            startRecoveryRedoPhase(partitions, logReader, lowWaterMarkLSN, 
winnerJobSet);
-        } finally {
-            logReader.close();
-            deleteRecoveryTemporaryFiles();
-        }
-    }
-
-    private synchronized Set<Integer> startRecoverysAnalysisPhase(Set<Integer> 
partitions, ILogReader logReader,
-            long lowWaterMarkLSN) throws IOException, ACIDException {
-        int updateLogCount = 0;
-        int entityCommitLogCount = 0;
-        int jobCommitLogCount = 0;
-        int abortLogCount = 0;
-        int jobId = -1;
-
-        Set<Integer> winnerJobSet = new HashSet<>();
-        jobId2WinnerEntitiesMap = new HashMap<>();
-
-        //set log reader to the lowWaterMarkLsn
-        ILogRecord logRecord = null;
-        logReader.initializeScan(lowWaterMarkLSN);
-
-        //collect all committed Lsn
-        JobEntityCommits jobEntityWinners = null;
-
-        logRecord = logReader.next();
-        while (logRecord != null) {
-            if (IS_DEBUG_MODE) {
-                LOGGER.info(logRecord.getLogRecordForDisplay());
-            }
-            switch (logRecord.getLogType()) {
-                case LogType.UPDATE:
-                    if (partitions.contains(logRecord.getResourcePartition())) 
{
-                        updateLogCount++;
-                    }
-                    break;
-                case LogType.JOB_COMMIT:
-                    jobId = logRecord.getJobId();
-                    winnerJobSet.add(jobId);
-                    if (jobId2WinnerEntitiesMap.containsKey(jobId)) {
-                        jobEntityWinners = jobId2WinnerEntitiesMap.get(jobId);
-                        //to delete any spilled files as well
-                        jobEntityWinners.clear();
-                        jobId2WinnerEntitiesMap.remove(jobId);
-                    }
-                    jobCommitLogCount++;
-                    break;
-                case LogType.ENTITY_COMMIT:
-                case LogType.UPSERT_ENTITY_COMMIT:
-                    if (partitions.contains(logRecord.getResourcePartition())) 
{
-                        jobId = logRecord.getJobId();
-                        if (!jobId2WinnerEntitiesMap.containsKey(jobId)) {
-                            jobEntityWinners = new JobEntityCommits(jobId);
-                            if (needToFreeMemory()) {
-                                //if we don't have enough memory for one more 
job, we will force all jobs to spill their cached entities to disk.
-                                //This could happen only when we have many 
jobs with small number of records and none of them have job commit.
-                                freeJobsCachedEntities(jobId);
-                            }
-                            jobId2WinnerEntitiesMap.put(jobId, 
jobEntityWinners);
-                        } else {
-                            jobEntityWinners = 
jobId2WinnerEntitiesMap.get(jobId);
-                        }
-                        jobEntityWinners.add(logRecord);
-                        entityCommitLogCount++;
-                    }
-                    break;
-                case LogType.ABORT:
-                    abortLogCount++;
-                    break;
-                case LogType.FLUSH:
-                case LogType.WAIT:
-                case LogType.MARKER:
-                    break;
-                default:
-                    throw new ACIDException("Unsupported LogType: " + 
logRecord.getLogType());
-            }
-            logRecord = logReader.next();
-        }
-
-        //prepare winners for search after analysis is done to flush anything 
remaining in memory to disk.
-        for (JobEntityCommits winners : jobId2WinnerEntitiesMap.values()) {
-            winners.prepareForSearch();
-        }
-
-        LOGGER.info("Logs analysis phase completed.");
-        LOGGER.info("Analysis log count update/entityCommit/jobCommit/abort = 
" + updateLogCount + "/"
-                + entityCommitLogCount + "/" + jobCommitLogCount + "/" + 
abortLogCount);
-
-        return winnerJobSet;
-    }
-
-    private synchronized void startRecoveryRedoPhase(Set<Integer> partitions, 
ILogReader logReader,
-            long lowWaterMarkLSN, Set<Integer> winnerJobSet) throws 
IOException, ACIDException {
-        int redoCount = 0;
-        int jobId = -1;
-
-        long resourceId;
-        long maxDiskLastLsn;
-        long LSN = -1;
-        ILSMIndex index = null;
-        LocalResource localResource = null;
-        Resource localResourceMetadata = null;
-        boolean foundWinner = false;
-        JobEntityCommits jobEntityWinners = null;
-
-        IAppRuntimeContextProvider appRuntimeContext = 
txnSubsystem.getAsterixAppRuntimeContextProvider();
-        IDatasetLifecycleManager datasetLifecycleManager = 
appRuntimeContext.getDatasetLifecycleManager();
-
-        Map<Long, LocalResource> resourcesMap = 
localResourceRepository.loadAndGetAllResources();
-        Map<Long, Long> resourceId2MaxLSNMap = new HashMap<>();
-        TxnId tempKeyTxnId = new TxnId(-1, -1, -1, null, -1, false);
-
-        ILogRecord logRecord = null;
-        try {
-            logReader.initializeScan(lowWaterMarkLSN);
-            logRecord = logReader.next();
-            while (logRecord != null) {
-                if (IS_DEBUG_MODE) {
-                    LOGGER.info(logRecord.getLogRecordForDisplay());
-                }
-                LSN = logRecord.getLSN();
-                jobId = logRecord.getJobId();
-                foundWinner = false;
-                switch (logRecord.getLogType()) {
-                    case LogType.UPDATE:
-                        if 
(partitions.contains(logRecord.getResourcePartition())) {
-                            if (winnerJobSet.contains(jobId)) {
-                                foundWinner = true;
-                            } else if 
(jobId2WinnerEntitiesMap.containsKey(jobId)) {
-                                jobEntityWinners = 
jobId2WinnerEntitiesMap.get(jobId);
-                                tempKeyTxnId.setTxnId(jobId, 
logRecord.getDatasetId(), logRecord.getPKHashValue(),
-                                        logRecord.getPKValue(), 
logRecord.getPKValueSize());
-                                if 
(jobEntityWinners.containsEntityCommitForTxnId(LSN, tempKeyTxnId)) {
-                                    foundWinner = true;
-                                }
-                            }
-                            if (foundWinner) {
-                                resourceId = logRecord.getResourceId();
-                                localResource = resourcesMap.get(resourceId);
-                                
/*******************************************************************
-                                 * [Notice]
-                                 * -> Issue
-                                 * Delete index may cause a problem during 
redo.
-                                 * The index operation to be redone couldn't 
be redone because the corresponding index
-                                 * may not exist in NC due to the possible 
index drop DDL operation.
-                                 * -> Approach
-                                 * Avoid the problem during redo.
-                                 * More specifically, the problem will be 
detected when the localResource of
-                                 * the corresponding index is retrieved, which 
will end up with 'null'.
-                                 * If null is returned, then just go and 
process the next
-                                 * log record.
-                                 
*******************************************************************/
-                                if (localResource == null) {
-                                    logRecord = logReader.next();
-                                    continue;
-                                }
-                                
/*******************************************************************/
-
-                                //get index instance from IndexLifeCycleManager
-                                //if index is not registered into 
IndexLifeCycleManager,
-                                //create the index using LocalMetadata stored 
in LocalResourceRepository
-                                //get partition path in this node
-                                localResourceMetadata = (Resource) 
localResource.getResource();
-                                index = (ILSMIndex) 
datasetLifecycleManager.get(localResource.getPath());
-                                if (index == null) {
-                                    //#. create index instance and register to 
indexLifeCycleManager
-                                    index = 
localResourceMetadata.createIndexInstance(appRuntimeContext, localResource);
-                                    
datasetLifecycleManager.register(localResource.getPath(), index);
-                                    
datasetLifecycleManager.open(localResource.getPath());
-
-                                    //#. get maxDiskLastLSN
-                                    ILSMIndex lsmIndex = index;
-                                    try {
-                                        maxDiskLastLsn = 
((AbstractLSMIOOperationCallback) lsmIndex
-                                                .getIOOperationCallback())
-                                                        
.getComponentLSN(lsmIndex.getImmutableComponents());
-                                    } catch (HyracksDataException e) {
-                                        
datasetLifecycleManager.close(localResource.getPath());
-                                        throw e;
-                                    }
-
-                                    //#. set resourceId and maxDiskLastLSN to 
the map
-                                    resourceId2MaxLSNMap.put(resourceId, 
maxDiskLastLsn);
-                                } else {
-                                    maxDiskLastLsn = 
resourceId2MaxLSNMap.get(resourceId);
-                                }
-
-                                if (LSN > maxDiskLastLsn) {
-                                    redo(logRecord, datasetLifecycleManager);
-                                    redoCount++;
-                                }
-                            }
-                        }
-                        break;
-                    case LogType.JOB_COMMIT:
-                    case LogType.ENTITY_COMMIT:
-                    case LogType.ABORT:
-                    case LogType.FLUSH:
-                    case LogType.UPSERT_ENTITY_COMMIT:
-                    case LogType.WAIT:
-                    case LogType.MARKER:
-                        //do nothing
-                        break;
-                    default:
-                        throw new ACIDException("Unsupported LogType: " + 
logRecord.getLogType());
-                }
-                logRecord = logReader.next();
-            }
-            LOGGER.info("Logs REDO phase completed. Redo logs count: " + 
redoCount);
-        } finally {
-            //close all indexes
-            Set<Long> resourceIdList = resourceId2MaxLSNMap.keySet();
-            for (long r : resourceIdList) {
-                datasetLifecycleManager.close(resourcesMap.get(r).getPath());
-            }
-        }
-    }
-
-    private boolean needToFreeMemory() {
-        return Runtime.getRuntime().freeMemory() < 
cachedEntityCommitsPerJobSize;
-    }
-
-    @Override
-    public long getMinFirstLSN() throws HyracksDataException {
-        long minFirstLSN = getLocalMinFirstLSN();
-
-        //if replication is enabled, consider replica resources min LSN
-        if (replicationEnabled) {
-            long remoteMinFirstLSN = getRemoteMinFirstLSN();
-            minFirstLSN = Math.min(minFirstLSN, remoteMinFirstLSN);
-        }
-
-        return minFirstLSN;
-    }
-
-    @Override
-    public long getLocalMinFirstLSN() throws HyracksDataException {
-        IDatasetLifecycleManager datasetLifecycleManager = 
txnSubsystem.getAsterixAppRuntimeContextProvider()
-                .getDatasetLifecycleManager();
-        List<IIndex> openIndexList = 
datasetLifecycleManager.getOpenResources();
-        long firstLSN;
-        //the min first lsn can only be the current append or smaller
-        long minFirstLSN = logMgr.getAppendLSN();
-        if (openIndexList.size() > 0) {
-            for (IIndex index : openIndexList) {
-                AbstractLSMIOOperationCallback ioCallback = 
(AbstractLSMIOOperationCallback) ((ILSMIndex) index)
-                        .getIOOperationCallback();
-
-                if (!((AbstractLSMIndex) 
index).isCurrentMutableComponentEmpty() || ioCallback.hasPendingFlush()) {
-                    firstLSN = ioCallback.getFirstLSN();
-                    minFirstLSN = Math.min(minFirstLSN, firstLSN);
-                }
-            }
-        }
-        return minFirstLSN;
-    }
-
-    private long getRemoteMinFirstLSN() {
-        IReplicaResourcesManager remoteResourcesManager = 
txnSubsystem.getAsterixAppRuntimeContextProvider()
-                .getAppContext().getReplicaResourcesManager();
-        long minRemoteLSN = 
remoteResourcesManager.getPartitionsMinLSN(localResourceRepository.getInactivePartitions());
-        return minRemoteLSN;
-    }
-
-    @Override
-    public File createJobRecoveryFile(int jobId, String fileName) throws 
IOException {
-        String recoveryDirPath = getRecoveryDirPath();
-        Path JobRecoveryFolder = Paths.get(recoveryDirPath + File.separator + 
jobId);
-        if (!Files.exists(JobRecoveryFolder)) {
-            Files.createDirectories(JobRecoveryFolder);
-        }
-
-        File jobRecoveryFile = new File(JobRecoveryFolder.toString() + 
File.separator + fileName);
-        if (!jobRecoveryFile.exists()) {
-            jobRecoveryFile.createNewFile();
-        } else {
-            throw new IOException("File: " + fileName + " for job id(" + jobId 
+ ") already exists");
-        }
-
-        return jobRecoveryFile;
-    }
-
-    @Override
-    public void deleteRecoveryTemporaryFiles() {
-        String recoveryDirPath = getRecoveryDirPath();
-        Path recoveryFolderPath = Paths.get(recoveryDirPath);
-        FileUtils.deleteQuietly(recoveryFolderPath.toFile());
-    }
-
-    private String getRecoveryDirPath() {
-        String logDir = logMgr.getLogManagerProperties().getLogDir();
-        if (!logDir.endsWith(File.separator)) {
-            logDir += File.separator;
-        }
-
-        return logDir + RECOVERY_FILES_DIR_NAME;
-    }
-
-    private void freeJobsCachedEntities(int requestingJobId) throws 
IOException {
-        if (jobId2WinnerEntitiesMap != null) {
-            for (Entry<Integer, JobEntityCommits> jobEntityCommits : 
jobId2WinnerEntitiesMap.entrySet()) {
-                //if the job is not the requester, free its memory
-                if (jobEntityCommits.getKey() != requestingJobId) {
-                    jobEntityCommits.getValue().spillToDiskAndfreeMemory();
-                }
-            }
-        }
-    }
-
-    @Override
-    public void rollbackTransaction(ITransactionContext txnContext) throws 
ACIDException {
-        int abortedJobId = txnContext.getJobId().getId();
-        // Obtain the first/last log record LSNs written by the Job
-        long firstLSN = txnContext.getFirstLSN();
-        /**
-         * The effect of any log record with LSN below minFirstLSN has already 
been written to disk and
-         * will not be rolled back. Therefore, we will set the first LSN of 
the job to the maximum of
-         * minFirstLSN and the job's first LSN.
-         */
-        try {
-            long localMinFirstLSN = getLocalMinFirstLSN();
-            firstLSN = Math.max(firstLSN, localMinFirstLSN);
-        } catch (HyracksDataException e) {
-            throw new ACIDException(e);
-        }
-        long lastLSN = txnContext.getLastLSN();
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("rollbacking transaction log records from " + firstLSN 
+ " to " + lastLSN);
-        }
-        // check if the transaction actually wrote some logs.
-        if (firstLSN == 
TransactionManagementConstants.LogManagerConstants.TERMINAL_LSN || firstLSN > 
lastLSN) {
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info(
-                        "no need to roll back as there were no operations by 
the transaction " + txnContext.getJobId());
-            }
-            return;
-        }
-
-        // While reading log records from firstLsn to lastLsn, collect 
uncommitted txn's Lsns
-        if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info("collecting loser transaction's LSNs from " + firstLSN 
+ " to " + lastLSN);
-        }
-
-        Map<TxnId, List<Long>> jobLoserEntity2LSNsMap = new HashMap<>();
-        TxnId tempKeyTxnId = new TxnId(-1, -1, -1, null, -1, false);
-        int updateLogCount = 0;
-        int entityCommitLogCount = 0;
-        int logJobId = -1;
-        long currentLSN = -1;
-        TxnId loserEntity = null;
-        List<Long> undoLSNSet = null;
-        //get active partitions on this node
-        Set<Integer> activePartitions = 
localResourceRepository.getActivePartitions();
-        ILogReader logReader = logMgr.getLogReader(false);
-        try {
-            logReader.initializeScan(firstLSN);
-            ILogRecord logRecord = null;
-            while (currentLSN < lastLSN) {
-                logRecord = logReader.next();
-                if (logRecord == null) {
-                    break;
-                } else {
-                    currentLSN = logRecord.getLSN();
-
-                    if (IS_DEBUG_MODE) {
-                        LOGGER.info(logRecord.getLogRecordForDisplay());
-                    }
-                }
-                logJobId = logRecord.getJobId();
-                if (logJobId != abortedJobId) {
-                    continue;
-                }
-                tempKeyTxnId.setTxnId(logJobId, logRecord.getDatasetId(), 
logRecord.getPKHashValue(),
-                        logRecord.getPKValue(), logRecord.getPKValueSize());
-                switch (logRecord.getLogType()) {
-                    case LogType.UPDATE:
-                        if 
(activePartitions.contains(logRecord.getResourcePartition())) {
-                            undoLSNSet = 
jobLoserEntity2LSNsMap.get(tempKeyTxnId);
-                            if (undoLSNSet == null) {
-                                loserEntity = new TxnId(logJobId, 
logRecord.getDatasetId(), logRecord.getPKHashValue(),
-                                        logRecord.getPKValue(), 
logRecord.getPKValueSize(), true);
-                                undoLSNSet = new LinkedList<>();
-                                jobLoserEntity2LSNsMap.put(loserEntity, 
undoLSNSet);
-                            }
-                            undoLSNSet.add(currentLSN);
-                            updateLogCount++;
-                            if (IS_DEBUG_MODE) {
-                                LOGGER.info(Thread.currentThread().getId() + 
"======> update[" + currentLSN + "]:"
-                                        + tempKeyTxnId);
-                            }
-                        }
-                        break;
-                    case LogType.ENTITY_COMMIT:
-                    case LogType.UPSERT_ENTITY_COMMIT:
-                        if 
(activePartitions.contains(logRecord.getResourcePartition())) {
-                            jobLoserEntity2LSNsMap.remove(tempKeyTxnId);
-                            entityCommitLogCount++;
-                            if (IS_DEBUG_MODE) {
-                                LOGGER.info(Thread.currentThread().getId() + 
"======> entity_commit[" + currentLSN + "]"
-                                        + tempKeyTxnId);
-                            }
-                        }
-                        break;
-                    case LogType.JOB_COMMIT:
-                        throw new ACIDException("Unexpected LogType(" + 
logRecord.getLogType() + ") during abort.");
-                    case LogType.ABORT:
-                    case LogType.FLUSH:
-                    case LogType.WAIT:
-                    case LogType.MARKER:
-                        //ignore
-                        break;
-                    default:
-                        throw new ACIDException("Unsupported LogType: " + 
logRecord.getLogType());
-                }
-            }
-
-            if (currentLSN != lastLSN) {
-                throw new ACIDException("LastLSN mismatch: lastLSN(" + lastLSN 
+ ") vs currentLSN(" + currentLSN
-                        + ") during abort( " + txnContext.getJobId() + ")");
-            }
-
-            //undo loserTxn's effect
-            LOGGER.log(Level.INFO, "undoing loser transaction's effect");
-
-            IDatasetLifecycleManager datasetLifecycleManager = 
txnSubsystem.getAsterixAppRuntimeContextProvider()
-                    .getDatasetLifecycleManager();
-            //TODO sort loser entities by smallest LSN to undo in one pass.
-            Iterator<Entry<TxnId, List<Long>>> iter = 
jobLoserEntity2LSNsMap.entrySet().iterator();
-            int undoCount = 0;
-            while (iter.hasNext()) {
-                Map.Entry<TxnId, List<Long>> loserEntity2LSNsMap = iter.next();
-                undoLSNSet = loserEntity2LSNsMap.getValue();
-                // The step below is important since the upsert operations 
must be done in reverse order.
-                Collections.reverse(undoLSNSet);
-                for (long undoLSN : undoLSNSet) {
-                    //here, all the log records are UPDATE type. So, we don't 
need to check the type again.
-                    //read the corresponding log record to be undone.
-                    logRecord = logReader.read(undoLSN);
-                    if (logRecord == null) {
-                        throw new ACIDException("IllegalState exception during 
abort( " + txnContext.getJobId() + ")");
-                    }
-                    if (IS_DEBUG_MODE) {
-                        LOGGER.info(logRecord.getLogRecordForDisplay());
-                    }
-                    undo(logRecord, datasetLifecycleManager);
-                    undoCount++;
-                }
-            }
-
-            if (LOGGER.isLoggable(Level.INFO)) {
-                LOGGER.info("undone loser transaction's effect");
-                LOGGER.info("[RecoveryManager's rollback log count] 
update/entityCommit/undo:" + updateLogCount + "/"
-                        + entityCommitLogCount + "/" + undoCount);
-            }
-        } finally {
-            logReader.close();
-        }
-    }
-
-    @Override
-    public void start() {
-        //no op
-    }
-
-    @Override
-    public void stop(boolean dumpState, OutputStream os) throws IOException {
-        // Shutdown checkpoint
-        checkpointManager.doSharpCheckpoint();
-    }
-
-    @Override
-    public void dumpState(OutputStream os) throws IOException {
-        // do nothing
-    }
-
-    private static void undo(ILogRecord logRecord, IDatasetLifecycleManager 
datasetLifecycleManager) {
-        try {
-            ILSMIndex index = (ILSMIndex) 
datasetLifecycleManager.getIndex(logRecord.getDatasetId(),
-                    logRecord.getResourceId());
-            ILSMIndexAccessor indexAccessor = 
index.createAccessor(NoOpOperationCallback.INSTANCE,
-                    NoOpOperationCallback.INSTANCE);
-            if (logRecord.getNewOp() == IndexOperation.INSERT.ordinal()) {
-                indexAccessor.forceDelete(logRecord.getNewValue());
-            } else if (logRecord.getNewOp() == 
IndexOperation.DELETE.ordinal()) {
-                indexAccessor.forceInsert(logRecord.getNewValue());
-            } else {
-                throw new IllegalStateException("Unsupported OperationType: " 
+ logRecord.getNewOp());
-            }
-        } catch (Exception e) {
-            throw new IllegalStateException("Failed to undo", e);
-        }
-    }
-
-    private static void redo(ILogRecord logRecord, IDatasetLifecycleManager 
datasetLifecycleManager) {
-        try {
-            int datasetId = logRecord.getDatasetId();
-            long resourceId = logRecord.getResourceId();
-            ILSMIndex index = (ILSMIndex) 
datasetLifecycleManager.getIndex(datasetId, resourceId);
-            ILSMIndexAccessor indexAccessor = 
index.createAccessor(NoOpOperationCallback.INSTANCE,
-                    NoOpOperationCallback.INSTANCE);
-            if (logRecord.getNewOp() == IndexOperation.INSERT.ordinal()) {
-                indexAccessor.forceInsert(logRecord.getNewValue());
-            } else if (logRecord.getNewOp() == 
IndexOperation.DELETE.ordinal()) {
-                indexAccessor.forceDelete(logRecord.getNewValue());
-            } else {
-                throw new IllegalStateException("Unsupported OperationType: " 
+ logRecord.getNewOp());
-            }
-        } catch (Exception e) {
-            throw new IllegalStateException("Failed to redo", e);
-        }
-    }
-
-    private class JobEntityCommits {
-        private static final String PARTITION_FILE_NAME_SEPARATOR = "_";
-        private final int jobId;
-        private final Set<TxnId> cachedEntityCommitTxns = new HashSet<>();
-        private final List<File> jobEntitCommitOnDiskPartitionsFiles = new 
ArrayList<>();
-        //a flag indicating whether all the the commits for this jobs have 
been added.
-        private boolean preparedForSearch = false;
-        private TxnId winnerEntity = null;
-        private int currentPartitionSize = 0;
-        private long partitionMaxLSN = 0;
-        private String currentPartitonName;
-
-        public JobEntityCommits(int jobId) {
-            this.jobId = jobId;
-        }
-
-        public void add(ILogRecord logRecord) throws IOException {
-            if (preparedForSearch) {
-                throw new IOException("Cannot add new entity commits after 
preparing for search.");
-            }
-            winnerEntity = new TxnId(logRecord.getJobId(), 
logRecord.getDatasetId(), logRecord.getPKHashValue(),
-                    logRecord.getPKValue(), logRecord.getPKValueSize(), true);
-            cachedEntityCommitTxns.add(winnerEntity);
-            //since log file is read sequentially, LSNs are always increasing
-            partitionMaxLSN = logRecord.getLSN();
-            currentPartitionSize += winnerEntity.getCurrentSize();
-            //if the memory budget for the current partition exceeded the 
limit, spill it to disk and free memory
-            if (currentPartitionSize >= cachedEntityCommitsPerJobSize) {
-                spillToDiskAndfreeMemory();
-            }
-        }
-
-        public void spillToDiskAndfreeMemory() throws IOException {
-            if (cachedEntityCommitTxns.size() > 0) {
-                if (!preparedForSearch) {
-                    writeCurrentPartitionToDisk();
-                }
-                cachedEntityCommitTxns.clear();
-                partitionMaxLSN = 0;
-                currentPartitionSize = 0;
-                currentPartitonName = "";
-            }
-        }
-
-        /**
-         * Call this method when no more entity commits will be added to this 
job.
-         *
-         * @throws IOException
-         */
-        public void prepareForSearch() throws IOException {
-            //if we have anything left in memory, we need to spill them to 
disk before searching other partitions.
-            //However, if we don't have anything on disk, we will search from 
memory only
-            if (jobEntitCommitOnDiskPartitionsFiles.size() > 0) {
-                spillToDiskAndfreeMemory();
-            } else {
-                //set the name of the current in memory partition to the 
current partition
-                currentPartitonName = getPartitionName(partitionMaxLSN);
-            }
-            preparedForSearch = true;
-        }
-
-        public boolean containsEntityCommitForTxnId(long logLSN, TxnId txnId) 
throws IOException {
-            //if we don't have any partitions on disk, search only from memory
-            if (jobEntitCommitOnDiskPartitionsFiles.size() == 0) {
-                return cachedEntityCommitTxns.contains(txnId);
-            } else {
-                //get candidate partitions from disk
-                ArrayList<File> candidatePartitions = 
getCandidiatePartitions(logLSN);
-                for (File partition : candidatePartitions) {
-                    if (serachPartition(partition, txnId)) {
-                        return true;
-                    }
-                }
-            }
-            return false;
-        }
-
-        /**
-         * @param logLSN
-         * @return partitions that have a max LSN > logLSN
-         */
-        public ArrayList<File> getCandidiatePartitions(long logLSN) {
-            ArrayList<File> candidiatePartitions = new ArrayList<>();
-            for (File partition : jobEntitCommitOnDiskPartitionsFiles) {
-                String partitionName = partition.getName();
-                //entity commit log must come after the update log, therefore, 
consider only partitions with max LSN > logLSN
-                if (getPartitionMaxLSNFromName(partitionName) > logLSN) {
-                    candidiatePartitions.add(partition);
-                }
-            }
-
-            return candidiatePartitions;
-        }
-
-        public void clear() {
-            cachedEntityCommitTxns.clear();
-            for (File partition : jobEntitCommitOnDiskPartitionsFiles) {
-                partition.delete();
-            }
-            jobEntitCommitOnDiskPartitionsFiles.clear();
-        }
-
-        private boolean serachPartition(File partition, TxnId txnId) throws 
IOException {
-            //load partition from disk if it is not  already in memory
-            if (!partition.getName().equals(currentPartitonName)) {
-                loadPartitionToMemory(partition, cachedEntityCommitTxns);
-                currentPartitonName = partition.getName();
-            }
-            return cachedEntityCommitTxns.contains(txnId);
-        }
-
-        private String getPartitionName(long maxLSN) {
-            return jobId + PARTITION_FILE_NAME_SEPARATOR + maxLSN;
-        }
-
-        private long getPartitionMaxLSNFromName(String partitionName) {
-            return 
Long.valueOf(partitionName.substring(partitionName.indexOf(PARTITION_FILE_NAME_SEPARATOR)
 + 1));
-        }
-
-        private void writeCurrentPartitionToDisk() throws IOException {
-            //if we don't have enough memory to allocate for this partition, 
we will ask recovery manager to free memory
-            if (needToFreeMemory()) {
-                freeJobsCachedEntities(jobId);
-            }
-            //allocate a buffer that can hold the current partition
-            ByteBuffer buffer = ByteBuffer.allocate(currentPartitionSize);
-            for (Iterator<TxnId> iterator = cachedEntityCommitTxns.iterator(); 
iterator.hasNext();) {
-                TxnId txnId = iterator.next();
-                //serialize the object and remove it from memory
-                txnId.serialize(buffer);
-                iterator.remove();
-            }
-            //name partition file based on job id and max lsn
-            File partitionFile = createJobRecoveryFile(jobId, 
getPartitionName(partitionMaxLSN));
-            //write file to disk
-            try (FileOutputStream fileOutputstream = new 
FileOutputStream(partitionFile, false);
-                    FileChannel fileChannel = fileOutputstream.getChannel()) {
-                buffer.flip();
-                while (buffer.hasRemaining()) {
-                    fileChannel.write(buffer);
-                }
-            }
-            jobEntitCommitOnDiskPartitionsFiles.add(partitionFile);
-        }
-
-        private void loadPartitionToMemory(File partition, Set<TxnId> 
partitionTxn) throws IOException {
-            partitionTxn.clear();
-            //if we don't have enough memory to a load partition, we will ask 
recovery manager to free memory
-            if (needToFreeMemory()) {
-                freeJobsCachedEntities(jobId);
-            }
-            ByteBuffer buffer = ByteBuffer.allocateDirect((int) 
partition.length());
-            //load partition to memory
-            try (InputStream is = new FileInputStream(partition)) {
-                int readByte;
-                while ((readByte = is.read()) != -1) {
-                    buffer.put((byte) readByte);
-                }
-            }
-            buffer.flip();
-            TxnId temp = null;
-            while (buffer.remaining() != 0) {
-                temp = TxnId.deserialize(buffer);
-                partitionTxn.add(temp);
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AsterixRuntimeComponentsProvider.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AsterixRuntimeComponentsProvider.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AsterixRuntimeComponentsProvider.java
deleted file mode 100644
index 8c6e253..0000000
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/AsterixRuntimeComponentsProvider.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.transaction.management.service.transaction;
-
-import org.apache.asterix.common.api.IAppRuntimeContext;
-import org.apache.asterix.common.api.IDatasetLifecycleManager;
-import org.apache.hyracks.api.context.IHyracksTaskContext;
-import org.apache.hyracks.storage.am.common.api.IIndexLifecycleManagerProvider;
-import org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationScheduler;
-import 
org.apache.hyracks.storage.am.lsm.common.api.ILSMIOOperationSchedulerProvider;
-import org.apache.hyracks.storage.common.IStorageManagerInterface;
-import org.apache.hyracks.storage.common.buffercache.IBufferCache;
-import org.apache.hyracks.storage.common.file.IFileMapProvider;
-import org.apache.hyracks.storage.common.file.ILocalResourceRepository;
-import org.apache.hyracks.storage.common.file.IResourceIdFactory;
-
-public class AsterixRuntimeComponentsProvider implements 
IIndexLifecycleManagerProvider, IStorageManagerInterface,
-        ILSMIOOperationSchedulerProvider {
-
-    private static final long serialVersionUID = 1L;
-
-    public static final AsterixRuntimeComponentsProvider RUNTIME_PROVIDER = 
new AsterixRuntimeComponentsProvider();
-
-    private AsterixRuntimeComponentsProvider() {
-    }
-
-    @Override
-    public ILSMIOOperationScheduler getIOScheduler(IHyracksTaskContext ctx) {
-        return ((IAppRuntimeContext) 
ctx.getJobletContext().getApplicationContext().getApplicationObject())
-                .getLSMIOScheduler();
-    }
-
-    @Override
-    public IBufferCache getBufferCache(IHyracksTaskContext ctx) {
-        return ((IAppRuntimeContext) 
ctx.getJobletContext().getApplicationContext().getApplicationObject())
-                .getBufferCache();
-    }
-
-    @Override
-    public IFileMapProvider getFileMapProvider(IHyracksTaskContext ctx) {
-        return ((IAppRuntimeContext) 
ctx.getJobletContext().getApplicationContext().getApplicationObject())
-                .getFileMapManager();
-    }
-
-    @Override
-    public ILocalResourceRepository 
getLocalResourceRepository(IHyracksTaskContext ctx) {
-        return ((IAppRuntimeContext) 
ctx.getJobletContext().getApplicationContext().getApplicationObject())
-                .getLocalResourceRepository();
-    }
-
-    @Override
-    public IDatasetLifecycleManager getLifecycleManager(IHyracksTaskContext 
ctx) {
-        return ((IAppRuntimeContext) 
ctx.getJobletContext().getApplicationContext().getApplicationObject())
-                .getDatasetLifecycleManager();
-    }
-
-    @Override
-    public IResourceIdFactory getResourceIdFactory(IHyracksTaskContext ctx) {
-        return ((IAppRuntimeContext) 
ctx.getJobletContext().getApplicationContext().getApplicationObject())
-                .getResourceIdFactory();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContext.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContext.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContext.java
index 215eb14..ad03a25 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContext.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionContext.java
@@ -98,7 +98,7 @@ public class TransactionContext implements 
ITransactionContext, Serializable {
     // creations.
     // also, the pool can throttle the number of concurrent active jobs at 
every
     // moment.
-    public TransactionContext(JobId jobId, TransactionSubsystem 
transactionSubsystem) throws ACIDException {
+    public TransactionContext(JobId jobId) throws ACIDException {
         this.jobId = jobId;
         firstLSN = new AtomicLong(-1);
         lastLSN = new AtomicLong(-1);
@@ -106,7 +106,7 @@ public class TransactionContext implements 
ITransactionContext, Serializable {
         isTimeout = false;
         isWriteTxn = new AtomicBoolean(false);
         isMetadataTxn = false;
-        indexMap = new HashMap<MutableLong, AbstractLSMIOOperationCallback>();
+        indexMap = new HashMap<>();
         primaryIndex = null;
         tempResourceIdForRegister = new MutableLong();
         logRecord = new LogRecord();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
index b08ecbb..3e79e1d 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionManager.java
@@ -30,6 +30,7 @@ import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.transactions.DatasetId;
 import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionManager;
+import org.apache.asterix.common.transactions.ITransactionSubsystem;
 import org.apache.asterix.common.transactions.JobId;
 import org.apache.asterix.common.transactions.LogRecord;
 import org.apache.asterix.common.utils.TransactionUtil;
@@ -43,11 +44,11 @@ public class TransactionManager implements 
ITransactionManager, ILifeCycleCompon
 
     public static final boolean IS_DEBUG_MODE = false;//true
     private static final Logger LOGGER = 
Logger.getLogger(TransactionManager.class.getName());
-    private final TransactionSubsystem txnSubsystem;
-    private Map<JobId, ITransactionContext> transactionContextRepository = new 
ConcurrentHashMap<JobId, ITransactionContext>();
+    private final ITransactionSubsystem txnSubsystem;
+    private Map<JobId, ITransactionContext> transactionContextRepository = new 
ConcurrentHashMap<>();
     private AtomicInteger maxJobId = new AtomicInteger(0);
 
-    public TransactionManager(TransactionSubsystem provider) {
+    public TransactionManager(ITransactionSubsystem provider) {
         this.txnSubsystem = provider;
     }
 
@@ -91,7 +92,7 @@ public class TransactionManager implements 
ITransactionManager, ILifeCycleCompon
                 synchronized (this) {
                     txnCtx = transactionContextRepository.get(jobId);
                     if (txnCtx == null) {
-                        txnCtx = new TransactionContext(jobId, txnSubsystem);
+                        txnCtx = new TransactionContext(jobId);
                         transactionContextRepository.put(jobId, txnCtx);
                     }
                 }
@@ -103,7 +104,8 @@ public class TransactionManager implements 
ITransactionManager, ILifeCycleCompon
     }
 
     @Override
-    public void commitTransaction(ITransactionContext txnCtx, DatasetId 
datasetId, int PKHashVal) throws ACIDException {
+    public void commitTransaction(ITransactionContext txnCtx, DatasetId 
datasetId, int PKHashVal)
+            throws ACIDException {
         //Only job-level commits call this method.
         try {
             if (txnCtx.isWriteTxn()) {
@@ -134,7 +136,7 @@ public class TransactionManager implements 
ITransactionManager, ILifeCycleCompon
     }
 
     @Override
-    public TransactionSubsystem getTransactionProvider() {
+    public ITransactionSubsystem getTransactionSubsystem() {
         return txnSubsystem;
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java
deleted file mode 100644
index 09183fe..0000000
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/transaction/TransactionSubsystem.java
+++ /dev/null
@@ -1,200 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.transaction.management.service.transaction;
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.Future;
-import java.util.logging.Logger;
-
-import org.apache.asterix.common.config.ClusterProperties;
-import org.apache.asterix.common.config.IPropertiesProvider;
-import org.apache.asterix.common.config.ReplicationProperties;
-import org.apache.asterix.common.config.TransactionProperties;
-import org.apache.asterix.common.exceptions.ACIDException;
-import org.apache.asterix.common.transactions.Checkpoint;
-import org.apache.asterix.common.transactions.CheckpointProperties;
-import org.apache.asterix.common.transactions.IAppRuntimeContextProvider;
-import org.apache.asterix.common.transactions.ICheckpointManager;
-import org.apache.asterix.common.transactions.ILockManager;
-import org.apache.asterix.common.transactions.ILogManager;
-import org.apache.asterix.common.transactions.IRecoveryManager;
-import org.apache.asterix.common.transactions.ITransactionManager;
-import org.apache.asterix.common.transactions.ITransactionSubsystem;
-import org.apache.asterix.common.utils.StorageConstants;
-import 
org.apache.asterix.transaction.management.service.locking.ConcurrentLockManager;
-import org.apache.asterix.transaction.management.service.logging.LogManager;
-import 
org.apache.asterix.transaction.management.service.logging.LogManagerWithReplication;
-import 
org.apache.asterix.transaction.management.service.recovery.CheckpointManagerFactory;
-import 
org.apache.asterix.transaction.management.service.recovery.RecoveryManager;
-
-/**
- * Provider for all the sub-systems (transaction/lock/log/recovery) managers.
- * Users of transaction sub-systems must obtain them from the provider.
- */
-public class TransactionSubsystem implements ITransactionSubsystem {
-    private final String id;
-    private final ILogManager logManager;
-    private final ILockManager lockManager;
-    private final ITransactionManager transactionManager;
-    private final IRecoveryManager recoveryManager;
-    private final IAppRuntimeContextProvider asterixAppRuntimeContextProvider;
-    private final TransactionProperties txnProperties;
-    private final ICheckpointManager checkpointManager;
-
-    //for profiling purpose
-    public static final boolean IS_PROFILE_MODE = false;//true
-    public long profilerEntityCommitLogCount = 0;
-    private EntityCommitProfiler ecp;
-    private Future<Object> fecp;
-
-    public TransactionSubsystem(String id, IAppRuntimeContextProvider 
asterixAppRuntimeContextProvider,
-            TransactionProperties txnProperties) throws ACIDException {
-        this.asterixAppRuntimeContextProvider = 
asterixAppRuntimeContextProvider;
-        this.id = id;
-        this.txnProperties = txnProperties;
-        this.transactionManager = new TransactionManager(this);
-        this.lockManager = new 
ConcurrentLockManager(txnProperties.getLockManagerShrinkTimer());
-        final boolean replicationEnabled = 
ClusterProperties.INSTANCE.isReplicationEnabled();
-        final CheckpointProperties checkpointProperties = new 
CheckpointProperties(txnProperties, id);
-        checkpointManager = CheckpointManagerFactory.create(this, 
checkpointProperties, replicationEnabled);
-        final Checkpoint latestCheckpoint = checkpointManager.getLatest();
-        if (latestCheckpoint != null && latestCheckpoint.getStorageVersion() 
!= StorageConstants.VERSION) {
-            throw new IllegalStateException(
-                    String.format("Storage version mismatch. Current version 
(%s). On disk version: (%s)",
-                            latestCheckpoint.getStorageVersion(), 
StorageConstants.VERSION));
-        }
-
-        ReplicationProperties asterixReplicationProperties = null;
-        if (asterixAppRuntimeContextProvider != null) {
-            asterixReplicationProperties = ((IPropertiesProvider) 
asterixAppRuntimeContextProvider
-                    .getAppContext()).getReplicationProperties();
-        }
-
-        if (asterixReplicationProperties != null && replicationEnabled) {
-            this.logManager = new LogManagerWithReplication(this);
-        } else {
-            this.logManager = new LogManager(this);
-        }
-        this.recoveryManager = new RecoveryManager(this);
-
-        if (IS_PROFILE_MODE) {
-            ecp = new EntityCommitProfiler(this, 
this.txnProperties.getCommitProfilerReportInterval());
-            fecp = (Future<Object>) 
getAsterixAppRuntimeContextProvider().getThreadExecutor().submit(ecp);
-        }
-    }
-
-    @Override
-    public ILogManager getLogManager() {
-        return logManager;
-    }
-
-    @Override
-    public ILockManager getLockManager() {
-        return lockManager;
-    }
-
-    @Override
-    public ITransactionManager getTransactionManager() {
-        return transactionManager;
-    }
-
-    @Override
-    public IRecoveryManager getRecoveryManager() {
-        return recoveryManager;
-    }
-
-    @Override
-    public IAppRuntimeContextProvider getAsterixAppRuntimeContextProvider() {
-        return asterixAppRuntimeContextProvider;
-    }
-
-    public TransactionProperties getTransactionProperties() {
-        return txnProperties;
-    }
-
-    @Override
-    public String getId() {
-        return id;
-    }
-
-    public void incrementEntityCommitCount() {
-        ++profilerEntityCommitLogCount;
-    }
-
-    @Override
-    public ICheckpointManager getCheckpointManager() {
-        return checkpointManager;
-    }
-
-    /**
-     * Thread for profiling entity level commit count
-     * This thread takes a report interval (in seconds) parameter and
-     * reports entity level commit count every report interval (in seconds)
-     * only if IS_PROFILE_MODE is set to true.
-     * However, the thread doesn't start reporting the count until the 
entityCommitCount > 0.
-     */
-    static class EntityCommitProfiler implements Callable<Boolean> {
-        private static final Logger LOGGER = 
Logger.getLogger(EntityCommitProfiler.class.getName());
-        private final long reportIntervalInMillisec;
-        private long lastEntityCommitCount;
-        private int reportIntervalInSeconds;
-        private TransactionSubsystem txnSubsystem;
-        private boolean firstReport = true;
-        private long startTimeStamp = 0;
-        private long reportRound = 1;
-
-        public EntityCommitProfiler(TransactionSubsystem txnSubsystem, int 
reportIntervalInSeconds) {
-            Thread.currentThread().setName("EntityCommitProfiler-Thread");
-            this.txnSubsystem = txnSubsystem;
-            this.reportIntervalInSeconds = reportIntervalInSeconds;
-            this.reportIntervalInMillisec = reportIntervalInSeconds * 1000;
-            lastEntityCommitCount = txnSubsystem.profilerEntityCommitLogCount;
-        }
-
-        @Override
-        public Boolean call() throws Exception {
-            while (true) {
-                Thread.sleep(reportIntervalInMillisec);
-                if (txnSubsystem.profilerEntityCommitLogCount > 0) {
-                    if (firstReport) {
-                        startTimeStamp = System.currentTimeMillis();
-                        firstReport = false;
-                    }
-                    //output the count
-                    outputCount();
-                }
-            }
-        }
-
-        private void outputCount() {
-            long currentTimeStamp = System.currentTimeMillis();
-            long currentEntityCommitCount = 
txnSubsystem.profilerEntityCommitLogCount;
-
-            LOGGER.severe("EntityCommitProfiler ReportRound[" + reportRound + 
"], AbsoluteTimeStamp[" + currentTimeStamp
-                    + "], ActualRelativeTimeStamp[" + (currentTimeStamp - 
startTimeStamp)
-                    + "], ExpectedRelativeTimeStamp[" + 
(reportIntervalInSeconds * reportRound) + "], IIPS["
-                    + ((currentEntityCommitCount - lastEntityCommitCount) / 
reportIntervalInSeconds) + "], IPS["
-                    + (currentEntityCommitCount / (reportRound * 
reportIntervalInSeconds)) + "]");
-
-            lastEntityCommitCount = currentEntityCommitCount;
-            ++reportRound;
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/ExpressionRuntimeProvider.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/ExpressionRuntimeProvider.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/ExpressionRuntimeProvider.java
new file mode 100644
index 0000000..31726d2
--- /dev/null
+++ 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/ExpressionRuntimeProvider.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.hyracks.algebricks.core.algebra.expressions;
+
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
+import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
+import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
+import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluatorFactory;
+import 
org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluatorFactory;
+import 
org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
+import org.apache.hyracks.algebricks.runtime.base.IUnnestingEvaluatorFactory;
+
+public class ExpressionRuntimeProvider implements IExpressionRuntimeProvider {
+    private final ILogicalExpressionJobGen lejg;
+
+    public ExpressionRuntimeProvider(ILogicalExpressionJobGen lejg) {
+        this.lejg = lejg;
+    }
+
+    @Override
+    public IScalarEvaluatorFactory createEvaluatorFactory(ILogicalExpression 
expr, IVariableTypeEnvironment env,
+            IOperatorSchema[] inputSchemas, JobGenContext context) throws 
AlgebricksException {
+        return lejg.createEvaluatorFactory(expr, env, inputSchemas, context);
+    }
+
+    @Override
+    public IAggregateEvaluatorFactory 
createAggregateFunctionFactory(AggregateFunctionCallExpression expr,
+            IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas, 
JobGenContext context)
+                    throws AlgebricksException {
+        return lejg.createAggregateFunctionFactory(expr, env, inputSchemas, 
context);
+    }
+
+    @Override
+    public ISerializedAggregateEvaluatorFactory 
createSerializableAggregateFunctionFactory(
+            AggregateFunctionCallExpression expr, IVariableTypeEnvironment 
env, IOperatorSchema[] inputSchemas,
+            JobGenContext context) throws AlgebricksException {
+        return lejg.createSerializableAggregateFunctionFactory(expr, env, 
inputSchemas, context);
+    }
+
+    @Override
+    public IRunningAggregateEvaluatorFactory 
createRunningAggregateFunctionFactory(StatefulFunctionCallExpression expr,
+            IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas, 
JobGenContext context)
+                    throws AlgebricksException {
+        return lejg.createRunningAggregateFunctionFactory(expr, env, 
inputSchemas, context);
+    }
+
+    @Override
+    public IUnnestingEvaluatorFactory 
createUnnestingFunctionFactory(UnnestingFunctionCallExpression expr,
+            IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas, 
JobGenContext context)
+                    throws AlgebricksException {
+        return lejg.createUnnestingFunctionFactory(expr, env, inputSchemas, 
context);
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/LogicalExpressionJobGenToExpressionRuntimeProviderAdapter.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/LogicalExpressionJobGenToExpressionRuntimeProviderAdapter.java
 
b/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/LogicalExpressionJobGenToExpressionRuntimeProviderAdapter.java
deleted file mode 100644
index 5b07a7e..0000000
--- 
a/hyracks-fullstack/algebricks/algebricks-core/src/main/java/org/apache/hyracks/algebricks/core/algebra/expressions/LogicalExpressionJobGenToExpressionRuntimeProviderAdapter.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.hyracks.algebricks.core.algebra.expressions;
-
-import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
-import org.apache.hyracks.algebricks.core.algebra.base.ILogicalExpression;
-import 
org.apache.hyracks.algebricks.core.algebra.operators.logical.IOperatorSchema;
-import org.apache.hyracks.algebricks.core.jobgen.impl.JobGenContext;
-import org.apache.hyracks.algebricks.runtime.base.IAggregateEvaluatorFactory;
-import 
org.apache.hyracks.algebricks.runtime.base.ISerializedAggregateEvaluatorFactory;
-import 
org.apache.hyracks.algebricks.runtime.base.IRunningAggregateEvaluatorFactory;
-import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
-import org.apache.hyracks.algebricks.runtime.base.IUnnestingEvaluatorFactory;
-
-public class LogicalExpressionJobGenToExpressionRuntimeProviderAdapter 
implements IExpressionRuntimeProvider {
-    private final ILogicalExpressionJobGen lejg;
-
-    public 
LogicalExpressionJobGenToExpressionRuntimeProviderAdapter(ILogicalExpressionJobGen
 lejg) {
-        this.lejg = lejg;
-    }
-
-    @Override
-    public IScalarEvaluatorFactory createEvaluatorFactory(ILogicalExpression 
expr, IVariableTypeEnvironment env,
-            IOperatorSchema[] inputSchemas, JobGenContext context) throws 
AlgebricksException {
-        return lejg.createEvaluatorFactory(expr, env, inputSchemas, context);
-    }
-
-    @Override
-    public IAggregateEvaluatorFactory 
createAggregateFunctionFactory(AggregateFunctionCallExpression expr,
-            IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas, 
JobGenContext context)
-                    throws AlgebricksException {
-        return lejg.createAggregateFunctionFactory(expr, env, inputSchemas, 
context);
-    }
-
-    @Override
-    public ISerializedAggregateEvaluatorFactory 
createSerializableAggregateFunctionFactory(
-            AggregateFunctionCallExpression expr, IVariableTypeEnvironment 
env, IOperatorSchema[] inputSchemas,
-            JobGenContext context) throws AlgebricksException {
-        return lejg.createSerializableAggregateFunctionFactory(expr, env, 
inputSchemas, context);
-    }
-
-    @Override
-    public IRunningAggregateEvaluatorFactory 
createRunningAggregateFunctionFactory(StatefulFunctionCallExpression expr,
-            IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas, 
JobGenContext context)
-                    throws AlgebricksException {
-        return lejg.createRunningAggregateFunctionFactory(expr, env, 
inputSchemas, context);
-    }
-
-    @Override
-    public IUnnestingEvaluatorFactory 
createUnnestingFunctionFactory(UnnestingFunctionCallExpression expr,
-            IVariableTypeEnvironment env, IOperatorSchema[] inputSchemas, 
JobGenContext context)
-                    throws AlgebricksException {
-        return lejg.createUnnestingFunctionFactory(expr, env, inputSchemas, 
context);
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/hyracks-fullstack/algebricks/algebricks-data/src/main/java/org/apache/hyracks/algebricks/data/IBinaryComparatorFactoryProvider.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-data/src/main/java/org/apache/hyracks/algebricks/data/IBinaryComparatorFactoryProvider.java
 
b/hyracks-fullstack/algebricks/algebricks-data/src/main/java/org/apache/hyracks/algebricks/data/IBinaryComparatorFactoryProvider.java
index bdeb018..0acb40a 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-data/src/main/java/org/apache/hyracks/algebricks/data/IBinaryComparatorFactoryProvider.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-data/src/main/java/org/apache/hyracks/algebricks/data/IBinaryComparatorFactoryProvider.java
@@ -21,7 +21,19 @@ package org.apache.hyracks.algebricks.data;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 
+/**
+ * Provides {@link 
org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory} for different 
types
+ */
+@FunctionalInterface
 public interface IBinaryComparatorFactoryProvider {
-    public IBinaryComparatorFactory getBinaryComparatorFactory(Object type, 
boolean ascending)
-            throws AlgebricksException;
+    /**
+     * @param type
+     *            the type of the binary data
+     * @param ascending
+     *            the order direction. true if ascending order is desired, 
false otherwise
+     * @return the appropriate {@link 
org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory} instance
+     * @throws AlgebricksException
+     *             if the comparator factory for the passed type could not be 
created
+     */
+    IBinaryComparatorFactory getBinaryComparatorFactory(Object type, boolean 
ascending) throws AlgebricksException;
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/hyracks-fullstack/algebricks/algebricks-data/src/main/java/org/apache/hyracks/algebricks/data/ITypeTraitProvider.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-data/src/main/java/org/apache/hyracks/algebricks/data/ITypeTraitProvider.java
 
b/hyracks-fullstack/algebricks/algebricks-data/src/main/java/org/apache/hyracks/algebricks/data/ITypeTraitProvider.java
index 3bac25d..1e502b3 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-data/src/main/java/org/apache/hyracks/algebricks/data/ITypeTraitProvider.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-data/src/main/java/org/apache/hyracks/algebricks/data/ITypeTraitProvider.java
@@ -20,6 +20,10 @@ package org.apache.hyracks.algebricks.data;
 
 import org.apache.hyracks.api.dataflow.value.ITypeTraits;
 
+/**
+ * Provides {@link org.apache.hyracks.api.dataflow.value.ITypeTraits} for 
different data types
+ */
+@FunctionalInterface
 public interface ITypeTraitProvider {
-    public ITypeTraits getTypeTrait(Object type);
+    ITypeTraits getTypeTrait(Object type);
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/hyracks-fullstack/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/compiler/PigletCompiler.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/compiler/PigletCompiler.java
 
b/hyracks-fullstack/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/compiler/PigletCompiler.java
index 0ac08c3..c8b6e59 100644
--- 
a/hyracks-fullstack/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/compiler/PigletCompiler.java
+++ 
b/hyracks-fullstack/algebricks/algebricks-examples/piglet-example/src/main/java/org/apache/hyracks/algebricks/examples/piglet/compiler/PigletCompiler.java
@@ -44,7 +44,7 @@ import 
org.apache.hyracks.algebricks.core.algebra.base.LogicalVariable;
 import 
org.apache.hyracks.algebricks.core.algebra.expressions.ConstantExpression;
 import 
org.apache.hyracks.algebricks.core.algebra.expressions.IExpressionTypeComputer;
 import 
org.apache.hyracks.algebricks.core.algebra.expressions.IVariableTypeEnvironment;
-import 
org.apache.hyracks.algebricks.core.algebra.expressions.LogicalExpressionJobGenToExpressionRuntimeProviderAdapter;
+import 
org.apache.hyracks.algebricks.core.algebra.expressions.ExpressionRuntimeProvider;
 import 
org.apache.hyracks.algebricks.core.algebra.expressions.ScalarFunctionCallExpression;
 import 
org.apache.hyracks.algebricks.core.algebra.expressions.VariableReferenceExpression;
 import 
org.apache.hyracks.algebricks.core.algebra.functions.AlgebricksBuiltinFunctions;
@@ -153,7 +153,7 @@ public class PigletCompiler {
         });
         builder.setPrinterProvider(PigletPrinterFactoryProvider.INSTANCE);
         builder.setExpressionRuntimeProvider(
-                new 
LogicalExpressionJobGenToExpressionRuntimeProviderAdapter(new 
PigletExpressionJobGen()));
+                new ExpressionRuntimeProvider(new PigletExpressionJobGen()));
         builder.setExpressionTypeComputer(new IExpressionTypeComputer() {
             @Override
             public Object getType(ILogicalExpression expr, 
IMetadataProvider<?, ?> metadataProvider,

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/ef9be0f9/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
 
b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
index 9463982..8f005d8 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-dataflow-common/src/main/java/org/apache/hyracks/dataflow/common/io/MessagingFrameTupleAppender.java
@@ -28,7 +28,7 @@ import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.util.HyracksConstants;
 import org.apache.hyracks.dataflow.common.comm.io.FrameTupleAppender;
-import org.apache.hyracks.dataflow.common.util.TaskUtils;
+import org.apache.hyracks.dataflow.common.utils.TaskUtil;
 import org.apache.hyracks.util.IntSerDeUtils;
 
 /**
@@ -112,7 +112,7 @@ public class MessagingFrameTupleAppender extends 
FrameTupleAppender {
     @Override
     public void write(IFrameWriter outWriter, boolean clearFrame) throws 
HyracksDataException {
         if (!initialized) {
-            message = TaskUtils.<VSizeFrame> get(HyracksConstants.KEY_MESSAGE, 
ctx);
+            message = TaskUtil.<VSizeFrame> get(HyracksConstants.KEY_MESSAGE, 
ctx);
             initialized = true;
         }
         // If message fits, we append it, otherwise, we append a null message, 
then send a message only

Reply via email to