http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1b4536ca/library/src/main/java/org/apache/apex/malhar/lib/wal/FSWindowDataManager.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/wal/FSWindowDataManager.java b/library/src/main/java/org/apache/apex/malhar/lib/wal/FSWindowDataManager.java index 3de6a1b..81f6aa0 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/wal/FSWindowDataManager.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/wal/FSWindowDataManager.java @@ -18,324 +18,687 @@ */ package org.apache.apex.malhar.lib.wal; +import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; +import java.util.HashMap; import java.util.Iterator; +import java.util.List; import java.util.Map; +import java.util.NavigableSet; import java.util.Set; import java.util.SortedSet; +import java.util.TreeSet; import javax.validation.constraints.NotNull; -import org.apache.hadoop.conf.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.apex.malhar.lib.state.managed.IncrementalCheckpointManager; +import org.apache.apex.malhar.lib.utils.FileContextUtils; +import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import com.esotericsoftware.kryo.Kryo; +import com.esotericsoftware.kryo.io.Input; +import com.esotericsoftware.kryo.io.Output; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Maps; -import com.google.common.collect.Sets; -import com.google.common.collect.TreeMultimap; +import com.google.common.primitives.Longs; import com.datatorrent.api.Context; import com.datatorrent.api.DAG; import com.datatorrent.api.annotation.Stateless; -import com.datatorrent.common.util.FSStorageAgent; +import com.datatorrent.lib.io.fs.AbstractFileInputOperator; +import com.datatorrent.lib.util.KryoCloneUtils; +import com.datatorrent.netlet.util.Slice; /** - * An {@link WindowDataManager} that uses FS to persist state. + * An {@link WindowDataManager} that uses FS to persist state every completed application window.<p/> + * + * FSWindowDataManager uses {@link FSWindowReplayWAL} to write to files. While saving an artifact corresponding + * to a window, the window date manager saves: + * <ol> + * <li>Window id</li> + * <li>Artifact</li> + * </ol> + * In order to ensure that all the entries corresponding to a window id are appended to the same wal part file, the + * wal operates in batch mode. In batch mode, the rotation of a wal part is done only after a batch is complete.<br/> + * <p/> + * + * <b>Replaying data of a completed window</b><br/> + * Main support that {@link WindowDataManager} provides to input operators is to be able to replay windows which + * were completely processed but not checkpointed. This is necessary for making input operators idempotent.<br/> + * The {@link FileSystemWAL}, however, ignores any data which is not checkpointed after failure. Therefore, + * {@link FSWindowDataManager} cannot rely solely on the state in wal after failures and so during recovery it modifies + * the wal state by traversing through the wal files.<br/> + * <br/> + * {@link IncrementalCheckpointManager}, however, relies only on the checkpointed state and therefore sets + * {@link #relyOnCheckpoints} to true. This is because {@link IncrementalCheckpointManager} only saves data per + * checkpoint window. + * <p/> + * + * <b>Purging of stale artifacts</b><br/> + * When a window gets committed, it indicates that all the operators in the DAG have completely finished processing that + * window. This means that the data of this window can be deleted as it will never be requested for replaying. + * Operators can invoke {@link #committed(long)} callback of {@link FSWindowDataManager} to trigger deletion of stale + * artifacts.<br/> + * <p/> + * + * <b>Dynamic partitioning support provided</b><br/> + * An operator can call {@link #partition(int, Set)} to get new instances of {@link FSWindowDataManager} during + * re-partitioning. When operator partitions are removed, then one of the new instances will handle the state of + * all deleted instances.<br/> + * After re-partitioning, the largest completed window is the min of max completed windows across all partitions.</br> + * + * <p/> + * At times, after re-partitioning, a physical operator may want to read the data saved by all the partitions for a + * completed window id. For example, {@link AbstractFileInputOperator}, needs to redistribute files based on the hash + * of file-paths and its partition keys, so it reads artifacts saved by all partitions during replay of a completed + * window. {@link #retrieveAllPartitions(long)} retrieves the artifacts of all partitions wrt a completed window. + * * * @since 3.4.0 */ public class FSWindowDataManager implements WindowDataManager { - private static final String DEF_RECOVERY_PATH = "idempotentState"; - - protected transient FSStorageAgent storageAgent; + private static final String DEF_STATE_PATH = "idempotentState"; + private static final String WAL_FILE_NAME = "wal"; /** - * Recovery path relative to app path where state is saved. + * State path relative to app filePath where state is saved. */ @NotNull - private String recoveryPath; - - private boolean isRecoveryPathRelativeToAppPath = true; + private String statePath = DEF_STATE_PATH; - /** - * largest window for which there is recovery data across all physical operator instances. - */ - protected transient long largestRecoveryWindow; + private boolean isStatePathRelativeToAppPath = true; /** * This is not null only for one physical instance.<br/> * It consists of operator ids which have been deleted but have some state that can be replayed. - * Only one of the instances would be handling (modifying) the files that belong to this state. + * Only one of the instances would be handling (modifying) the files that belong to this state. <br/> + * The value is assigned during partitioning. */ - protected Set<Integer> deletedOperators; + private Set<Integer> deletedOperators; + + private boolean repartitioned; /** - * Sorted mapping from window id to all the operators that have state to replay for that window. + * Used when it is not necessary to replay every streaming/app window. + * Used by {@link IncrementalCheckpointManager} */ - protected final transient TreeMultimap<Long, Integer> replayState; + private boolean relyOnCheckpoints; + + private transient long largestCompletedWindow = Stateless.WINDOW_ID; + + private final FSWindowReplayWAL wal = new FSWindowReplayWAL(); + + //operator id -> wals (sorted) + private final transient Map<Integer, FSWindowReplayWAL> readOnlyWals = new HashMap<>(); + + private transient String fullStatePath; + private transient int operatorId; + + private final transient Kryo kryo = new Kryo(); - protected transient FileSystem fs; - protected transient Path appPath; + private transient FileContext fileContext; public FSWindowDataManager() { - replayState = TreeMultimap.create(); - largestRecoveryWindow = Stateless.WINDOW_ID; - recoveryPath = DEF_RECOVERY_PATH; + kryo.setClassLoader(Thread.currentThread().getContextClassLoader()); } @Override public void setup(Context.OperatorContext context) { - Configuration configuration = new Configuration(); - if (isRecoveryPathRelativeToAppPath) { - appPath = new Path(context.getValue(DAG.APPLICATION_PATH) + Path.SEPARATOR + recoveryPath); + operatorId = context.getId(); + + if (isStatePathRelativeToAppPath) { + fullStatePath = context.getValue(DAG.APPLICATION_PATH) + Path.SEPARATOR + statePath; } else { - appPath = new Path(recoveryPath); + fullStatePath = statePath; } try { - storageAgent = new FSStorageAgent(appPath.toString(), configuration); - - fs = FileSystem.newInstance(appPath.toUri(), configuration); - - if (fs.exists(appPath)) { - FileStatus[] fileStatuses = fs.listStatus(appPath); - - for (FileStatus operatorDirStatus : fileStatuses) { - int operatorId = Integer.parseInt(operatorDirStatus.getPath().getName()); - - for (FileStatus status : fs.listStatus(operatorDirStatus.getPath())) { - String fileName = status.getPath().getName(); - if (fileName.endsWith(FSStorageAgent.TMP_FILE)) { - continue; - } - long windowId = Long.parseLong(fileName, 16); - replayState.put(windowId, operatorId); - if (windowId > largestRecoveryWindow) { - largestRecoveryWindow = windowId; - } + fileContext = FileContextUtils.getFileContext(fullStatePath); + setupWals(context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID)); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private void setupWals(long activationWindow) throws IOException + { + findFiles(wal, operatorId); + configureWal(wal, operatorId, !relyOnCheckpoints); + + if (repartitioned) { + createReadOnlyWals(); + for (Map.Entry<Integer, FSWindowReplayWAL> entry : readOnlyWals.entrySet()) { + findFiles(entry.getValue(), entry.getKey()); + configureWal(entry.getValue(), entry.getKey(), true); + } + } + + //find largest completed window + if (!relyOnCheckpoints) { + long completedWindow = findLargestCompletedWindow(wal, null); + //committed will not delete temp files so it is possible that when reading from files, a smaller window + //than the activation window is found. + if (completedWindow > activationWindow) { + largestCompletedWindow = completedWindow; + } + if (wal.getReader().getCurrentPointer() != null) { + wal.getWriter().setCurrentPointer(wal.getReader().getCurrentPointer().getCopy()); + } + } else { + wal.walEndPointerAfterRecovery = wal.getWriter().getCurrentPointer(); + largestCompletedWindow = wal.getLastCheckpointedWindow(); + } + + if (repartitioned && largestCompletedWindow > Stateless.WINDOW_ID) { + //find the min of max window ids: a downstream will not finish a window until all the upstream have finished it. + for (Map.Entry<Integer, FSWindowReplayWAL> entry : readOnlyWals.entrySet()) { + + long completedWindow = Stateless.WINDOW_ID; + if (!relyOnCheckpoints) { + long window = findLargestCompletedWindow(entry.getValue(), null); + if (window > activationWindow) { + completedWindow = window; } + } else { + completedWindow = findLargestCompletedWindow(entry.getValue(), activationWindow); + } + + if (completedWindow < largestCompletedWindow) { + largestCompletedWindow = completedWindow; } } - } catch (IOException e) { - throw new RuntimeException(e); } + + //reset readers + wal.getReader().seek(wal.walStartPointer); + for (FSWindowReplayWAL wal : readOnlyWals.values()) { + wal.getReader().seek(wal.walStartPointer); + } + + wal.setup(); + for (FSWindowReplayWAL wal : readOnlyWals.values()) { + wal.setup(); + } + } - @Override - public void save(Object object, int operatorId, long windowId) throws IOException + protected void createReadOnlyWals() throws IOException { - storageAgent.save(object, operatorId, windowId); + RemoteIterator<FileStatus> operatorsIter = fileContext.listStatus(new Path(fullStatePath)); + while (operatorsIter.hasNext()) { + FileStatus status = operatorsIter.next(); + int operatorId = Integer.parseInt(status.getPath().getName()); + + if (operatorId != this.operatorId) { + //create read-only wal for other partitions + FSWindowReplayWAL wal = new FSWindowReplayWAL(true); + readOnlyWals.put(operatorId, wal); + } + } } - @Override - public Object load(int operatorId, long windowId) throws IOException + private void configureWal(FSWindowReplayWAL wal, int operatorId, boolean updateWalState) throws IOException { - Set<Integer> operators = replayState.get(windowId); - if (operators == null || !operators.contains(operatorId)) { - return null; + String operatorDir = fullStatePath + Path.SEPARATOR + operatorId; + wal.setFilePath(operatorDir + Path.SEPARATOR + WAL_FILE_NAME); + wal.fileContext = fileContext; + + if (updateWalState) { + if (!wal.fileDescriptors.isEmpty()) { + SortedSet<Integer> sortedParts = wal.fileDescriptors.keySet(); + + wal.walStartPointer = new FileSystemWAL.FileSystemWALPointer(sortedParts.first(), 0); + + FSWindowReplayWAL.FileDescriptor last = wal.fileDescriptors.get(sortedParts.last()).last(); + if (last.isTmp) { + wal.tempPartFiles.put(last.part, last.filePath.toString()); + } + } } - return storageAgent.load(operatorId, windowId); } - @Override - public void delete(int operatorId, long windowId) throws IOException + private void findFiles(FSWindowReplayWAL wal, int operatorId) throws IOException { - storageAgent.delete(operatorId, windowId); + String operatorDir = fullStatePath + Path.SEPARATOR + operatorId; + Path operatorPath = new Path(operatorDir); + if (fileContext.util().exists(operatorPath)) { + RemoteIterator<FileStatus> walFilesIter = fileContext.listStatus(operatorPath); + + while (walFilesIter.hasNext()) { + FileStatus fileStatus = walFilesIter.next(); + FSWindowReplayWAL.FileDescriptor descriptor = FSWindowReplayWAL.FileDescriptor.create(fileStatus.getPath()); + wal.fileDescriptors.put(descriptor.part, descriptor); + } + } } - @Override - public Map<Integer, Object> load(long windowId) throws IOException + private long findLargestCompletedWindow(FSWindowReplayWAL wal, Long ceilingWindow) throws IOException + { + if (!wal.fileDescriptors.isEmpty()) { + FileSystemWAL.FileSystemWALReader reader = wal.getReader(); + + //to find the largest window, we only need to look at the last file. + NavigableSet<Integer> descendingParts = new TreeSet<>(wal.fileDescriptors.keySet()).descendingSet(); + for (int part : descendingParts) { + FSWindowReplayWAL.FileDescriptor last = wal.fileDescriptors.get(part).last(); + reader.seek(new FileSystemWAL.FileSystemWALPointer(last.part, 0)); + + long endOffset = -1; + + long lastWindow = Stateless.WINDOW_ID; + Slice slice = readNext(reader); + + while (slice != null) { + boolean skipComplete = skipNext(reader); //skip the artifact because we need just the largest window id. + if (!skipComplete) { + //artifact not saved so this window was not finished. + break; + } + long offset = reader.getCurrentPointer().getOffset(); + + long window = Longs.fromByteArray(slice.toByteArray()); + if (ceilingWindow != null && window > ceilingWindow) { + break; + } + endOffset = offset; + lastWindow = window; + slice = readNext(reader); //either null or next window + } + + if (endOffset != -1) { + wal.walEndPointerAfterRecovery = new FileSystemWAL.FileSystemWALPointer(last.part, endOffset); + wal.windowWalParts.put(lastWindow, wal.walEndPointerAfterRecovery.getPartNum()); + return lastWindow; + } + } + } + return Stateless.WINDOW_ID; + } + + /** + * Helper method that catches IOException while reading from wal to check if an entry was saved completely or not. + * @param reader wal reader + * @return wal entry + */ + protected Slice readNext(FileSystemWAL.FileSystemWALReader reader) { - Set<Integer> operators = replayState.get(windowId); - if (operators == null) { + try { + return reader.next(); + } catch (IOException ex) { + //exception while reading wal entry which can be because there may have been failure while persisting an + //artifact so this window is not a finished window. + try { + reader.close(); + } catch (IOException ioe) { + //closing the reader quietly. + } return null; } - Map<Integer, Object> data = Maps.newHashMap(); - for (int operatorId : operators) { - data.put(operatorId, load(operatorId, windowId)); + } + + /** + * Helper method that catches IOException while skipping an entry from wal to check if an entry was saved + * completely or not. + * @param reader wal reader + * @return true if skip was successful; false otherwise. + */ + private boolean skipNext(FileSystemWAL.FileSystemWALReader reader) + { + try { + reader.skipNext(); + return true; + } catch (IOException ex) { + //exception while skipping wal entry which can be because there may have been failure while persisting an + //artifact so this window is not a finished window. + try { + reader.close(); + } catch (IOException e) { + //closing the reader quietly + } + return false; } - return data; } - @Override - public long[] getWindowIds(int operatorId) throws IOException + private void closeReaders() throws IOException { - Path operatorPath = new Path(appPath, String.valueOf(operatorId)); - if (!fs.exists(operatorPath) || fs.listStatus(operatorPath).length == 0) { - return null; + //close all reader stream and remove read-only wals + wal.getReader().close(); + if (readOnlyWals.size() > 0) { + Iterator<Map.Entry<Integer, FSWindowReplayWAL>> walIterator = readOnlyWals.entrySet().iterator(); + while (walIterator.hasNext()) { + Map.Entry<Integer, FSWindowReplayWAL> entry = walIterator.next(); + entry.getValue().getReader().close(); + + int operatorId = entry.getKey(); + if (deletedOperators == null || !deletedOperators.contains(operatorId)) { + //the read only wal can be removed. + walIterator.remove(); + } + } } - return storageAgent.getWindowIds(operatorId); + } + + /** + * Save writes 2 entries to the wal: <br/> + * <ol> + * <li>window id</li> + * <li>artifact</li> + * </ol> + * Note: The wal is being used in batch mode so the part file will never be rotated between the 2 entries.<br/> + * The wal part file may be rotated after both the entries, when + * {@link FileSystemWAL.FileSystemWALWriter#rotateIfNecessary()} is triggered. + * + * @param object state + * @param windowId window id + * @throws IOException + */ + @Override + public void save(Object object, long windowId) throws IOException + { + closeReaders(); + FileSystemWAL.FileSystemWALWriter writer = wal.getWriter(); + + byte[] windowIdBytes = Longs.toByteArray(windowId); + writer.append(new Slice(windowIdBytes)); + writer.append(toSlice(object)); + wal.beforeCheckpoint(windowId); + wal.windowWalParts.put(windowId, writer.getCurrentPointer().getPartNum()); + writer.rotateIfNecessary(); } + /** + * The implementation assumes that artifacts are retrieved in increasing order of window ids. Typically it is used + * to replay tuples of successive windows in input operators after failure. + * @param windowId window id + * @return saved state for the window id. + * @throws IOException + */ @Override - public long[] getWindowIds() throws IOException + public Object retrieve(long windowId) throws IOException { - SortedSet<Long> windowIds = replayState.keySet(); - long[] windowIdsArray = new long[windowIds.size()]; + return retrieve(wal, windowId); + } - int index = 0; + @Override + public Map<Integer, Object> retrieveAllPartitions(long windowId) throws IOException + { + if (windowId > largestCompletedWindow) { + return null; + } + Map<Integer, Object> artifacts = Maps.newHashMap(); + Object artifact = retrieve(wal, windowId); + if (artifact != null) { + artifacts.put(operatorId, artifact); + } + if (repartitioned) { + for (Map.Entry<Integer, FSWindowReplayWAL> entry : readOnlyWals.entrySet()) { + artifact = retrieve(entry.getValue(), windowId); + if (artifact != null) { + artifacts.put(entry.getKey(), artifact); + } + } + } + return artifacts; + } - for (Long windowId: windowIds) { - windowIdsArray[index] = windowId; - index++; + private Object retrieve(FSWindowReplayWAL wal, long windowId) throws IOException + { + if (windowId > largestCompletedWindow || wal.walEndPointerAfterRecovery == null) { + return null; } - return windowIdsArray; - } + FileSystemWAL.FileSystemWALReader reader = wal.getReader(); + + while (reader.getCurrentPointer() == null || + reader.getCurrentPointer().compareTo(wal.walEndPointerAfterRecovery) < 0) { + long currentWindow; + if (wal.retrievedWindow == null) { + wal.retrievedWindow = readNext(reader); + Preconditions.checkNotNull(wal.retrievedWindow); + } + currentWindow = Longs.fromByteArray(wal.retrievedWindow.toByteArray()); + + if (windowId == currentWindow) { + Slice data = readNext(reader); + Preconditions.checkNotNull(data, "data is null"); + + wal.windowWalParts.put(currentWindow, reader.getCurrentPointer().getPartNum()); + wal.retrievedWindow = readNext(reader); //null or next window + + return fromSlice(data); + } else if (windowId < currentWindow) { + //no artifact saved corresponding to that window and artifact is not read. + return null; + } else { + //windowId > current window so we skip the data + skipNext(reader); + wal.windowWalParts.put(currentWindow, reader.getCurrentPointer().getPartNum()); + + wal.retrievedWindow = readNext(reader); //null or next window + if (wal.retrievedWindow == null) { + //nothing else to read + return null; + } + } + } + return null; + } + /** - * This deletes all the recovery files of window ids <= windowId. + * Deletes artifacts for all windows less than equal to committed window id.<p/> + * + * {@link FSWindowDataManager} uses {@link FSWindowReplayWAL} to record data which writes to temp part files. + * The temp part files are finalized only when they are rotated. So when a window is committed, artifacts for + * windows <= committed window may still be in temporary files. These temporary files are needed for Wal recovery so + * we do not alter them and we delete a part file completely (opposed to partial deletion) for efficiency.<br/> + * Therefore, data of a window gets deleted only when it satisfies all the following criteria: + * <ul> + * <li>window <= committed window id</li> + * <li>the part file of the artifact is rotated.</li> + * <li>the part file doesn't contain artifacts for windows greater than the artifact's window to avoid partial + * file deletion.</li> + * </ul> * - * @param operatorId operator id. - * @param windowId the largest window id for which the states will be deleted. + * In addition to this we also delete: + * <ol> + * <li>Some stray temporary files are also deleted which correspond to completely deleted parts.</li> + * <li>Once the committed window > largest recovery window, we delete the files of partitions that were removed.</li> + * </ol> + * + * @param committedWindowId window id * @throws IOException */ @Override - public void deleteUpTo(int operatorId, long windowId) throws IOException + public void committed(long committedWindowId) throws IOException { - //deleting the replay state - if (windowId <= largestRecoveryWindow && deletedOperators != null && !deletedOperators.isEmpty()) { - Iterator<Map.Entry<Long, Collection<Integer>>> iterator = replayState.asMap().entrySet().iterator(); - while (iterator.hasNext()) { - Map.Entry<Long, Collection<Integer>> windowEntry = iterator.next(); - long lwindow = windowEntry.getKey(); - if (lwindow > windowId) { - break; - } - for (Integer loperator : windowEntry.getValue()) { - - if (deletedOperators.contains(loperator)) { - storageAgent.delete(loperator, lwindow); - - Path loperatorPath = new Path(appPath, Integer.toString(loperator)); - if (fs.listStatus(loperatorPath).length == 0) { - //The operator was deleted and it has nothing to replay. - deletedOperators.remove(loperator); - fs.delete(loperatorPath, true); - } - } else if (loperator == operatorId) { - storageAgent.delete(loperator, lwindow); + closeReaders(); + //find the largest window <= committed window id and the part file corresponding to it is finalized. + Map.Entry<Long, Integer> largestEntryForDeletion = null; + + Iterator<Map.Entry<Long, Integer>> iterator = wal.windowWalParts.entrySet().iterator(); + + while (iterator.hasNext()) { + Map.Entry<Long, Integer> entry = iterator.next(); + //only completely finalized part files are deleted. + if (entry.getKey() <= committedWindowId && !wal.tempPartFiles.containsKey(entry.getValue())) { + largestEntryForDeletion = entry; + iterator.remove(); + } + if (entry.getKey() > committedWindowId) { + break; + } + } + + if (largestEntryForDeletion != null && !wal.windowWalParts.containsValue( + largestEntryForDeletion.getValue()) /* no artifacts for higher window present*/) { + + int highestPartToDelete = largestEntryForDeletion.getValue(); + wal.getWriter().delete(new FileSystemWAL.FileSystemWALPointer(highestPartToDelete + 1, 0)); + + //also delete any old stray temp files that correspond to parts < deleteTillPointer.partNum + Iterator<Map.Entry<Integer, FSWindowReplayWAL.FileDescriptor>> fileIterator = + wal.fileDescriptors.entries().iterator(); + while (fileIterator.hasNext()) { + Map.Entry<Integer, FSWindowReplayWAL.FileDescriptor> entry = fileIterator.next(); + if (entry.getKey() <= highestPartToDelete && entry.getValue().isTmp) { + if (fileContext.util().exists(entry.getValue().filePath)) { + fileContext.delete(entry.getValue().filePath, true); } + } else if (entry.getKey() > highestPartToDelete) { + break; } - iterator.remove(); } } - if (fs.listStatus(new Path(appPath, Integer.toString(operatorId))).length > 0) { - long[] windowsAfterReplay = storageAgent.getWindowIds(operatorId); - Arrays.sort(windowsAfterReplay); - for (long lwindow : windowsAfterReplay) { - if (lwindow <= windowId) { - storageAgent.delete(operatorId, lwindow); + //delete data of partitions that have been removed + if (deletedOperators != null) { + Iterator<Integer> operatorIter = deletedOperators.iterator(); + + while (operatorIter.hasNext()) { + int deletedOperatorId = operatorIter.next(); + FSWindowReplayWAL wal = readOnlyWals.get(deletedOperatorId); + if (committedWindowId > largestCompletedWindow) { + Path operatorDir = new Path(fullStatePath + Path.SEPARATOR + deletedOperatorId); + + if (fileContext.util().exists(operatorDir)) { + fileContext.delete(operatorDir, true); + } + wal.teardown(); + operatorIter.remove(); + readOnlyWals.remove(deletedOperatorId); } } + + if (deletedOperators.isEmpty()) { + deletedOperators = null; + } } } - @Override - public long getLargestRecoveryWindow() + private Slice toSlice(Object object) + { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + Output output = new Output(baos); + kryo.writeClassAndObject(output, object); + output.close(); + byte[] bytes = baos.toByteArray(); + + return new Slice(bytes); + } + + protected Object fromSlice(Slice slice) + { + Input input = new Input(slice.buffer, slice.offset, slice.length); + Object object = kryo.readClassAndObject(input); + input.close(); + return object; + } + + public long getLargestCompletedWindow() { - return largestRecoveryWindow; + return largestCompletedWindow; } @Override - public void partitioned(Collection<WindowDataManager> newManagers, Set<Integer> removedOperatorIds) + public List<WindowDataManager> partition(int newCount, Set<Integer> removedOperatorIds) { - Preconditions.checkArgument(newManagers != null && !newManagers.isEmpty(), - "there has to be one idempotent storage manager"); - org.apache.apex.malhar.lib.wal.FSWindowDataManager deletedOperatorsManager = null; + repartitioned = true; + KryoCloneUtils<FSWindowDataManager> cloneUtils = KryoCloneUtils.createCloneUtils(this); + + FSWindowDataManager[] windowDataManagers = cloneUtils.getClones(newCount); if (removedOperatorIds != null && !removedOperatorIds.isEmpty()) { - if (this.deletedOperators == null) { - this.deletedOperators = Sets.newHashSet(); - } - this.deletedOperators.addAll(removedOperatorIds); + windowDataManagers[0].deletedOperators = removedOperatorIds; } - for (WindowDataManager storageManager : newManagers) { - - org.apache.apex.malhar.lib.wal.FSWindowDataManager lmanager = (org.apache.apex.malhar.lib.wal.FSWindowDataManager)storageManager; - lmanager.recoveryPath = this.recoveryPath; - lmanager.storageAgent = this.storageAgent; + List<WindowDataManager> mangers = new ArrayList<>(); + mangers.addAll(Arrays.asList(windowDataManagers)); + return mangers; + } - if (lmanager.deletedOperators != null) { - deletedOperatorsManager = lmanager; - } - //only one physical instance can manage deleted operators so clearing this field for rest of the instances. - if (lmanager != deletedOperatorsManager) { - lmanager.deletedOperators = null; - } + @Override + public void teardown() + { + wal.teardown(); + for (FSWindowReplayWAL wal : readOnlyWals.values()) { + wal.teardown(); } + } - if (removedOperatorIds == null || removedOperatorIds.isEmpty()) { - //Nothing to do - return; - } - if (this.deletedOperators != null) { - - /*If some operators were removed then there needs to be a manager which can clean there state when it is not - needed.*/ - if (deletedOperatorsManager == null) { - //None of the managers were handling deleted operators data. - deletedOperatorsManager = (org.apache.apex.malhar.lib.wal.FSWindowDataManager)newManagers.iterator().next(); - deletedOperatorsManager.deletedOperators = Sets.newHashSet(); - } + protected void setRelyOnCheckpoints(boolean relyOnCheckpoints) + { + this.relyOnCheckpoints = relyOnCheckpoints; + } - deletedOperatorsManager.deletedOperators.addAll(removedOperatorIds); - } + /** + * @return wal instance + */ + protected FSWindowReplayWAL getWal() + { + return wal; } - @Override - public void teardown() + @VisibleForTesting + public Set<Integer> getDeletedOperators() { - try { - fs.close(); - } catch (IOException e) { - throw new RuntimeException(e); + if (deletedOperators == null) { + return null; } + return ImmutableSet.copyOf(deletedOperators); } /** - * @return recovery path + * @return recovery filePath */ - public String getRecoveryPath() + public String getStatePath() { - return recoveryPath; + return statePath; } /** - * Sets the recovery path. If {@link #isRecoveryPathRelativeToAppPath} is true then this path is handled relative - * to the application path; otherwise it is handled as an absolute path. + * Sets the state path. If {@link #isStatePathRelativeToAppPath} is true then this filePath is handled + * relative + * to the application filePath; otherwise it is handled as an absolute filePath. * - * @param recoveryPath recovery path + * @param statePath recovery filePath */ - public void setRecoveryPath(String recoveryPath) + public void setStatePath(String statePath) { - this.recoveryPath = recoveryPath; + this.statePath = statePath; } /** - * @return true if recovery path is relative to app path; false otherwise. + * @return true if state path is relative to app path; false otherwise. */ - public boolean isRecoveryPathRelativeToAppPath() + public boolean isStatePathRelativeToAppPath() { - return isRecoveryPathRelativeToAppPath; + return isStatePathRelativeToAppPath; } /** - * Specifies whether the recovery path is relative to application path. + * Specifies whether the state path is relative to application filePath. * - * @param recoveryPathRelativeToAppPath true if recovery path is relative to application path; false otherwise. + * @param statePathRelativeToAppPath true if state path is relative to application path; false + * otherwise. */ - public void setRecoveryPathRelativeToAppPath(boolean recoveryPathRelativeToAppPath) + public void setStatePathRelativeToAppPath(boolean statePathRelativeToAppPath) { - isRecoveryPathRelativeToAppPath = recoveryPathRelativeToAppPath; + isStatePathRelativeToAppPath = statePathRelativeToAppPath; } + + private static Logger LOG = LoggerFactory.getLogger(FSWindowDataManager.class); }
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1b4536ca/library/src/main/java/org/apache/apex/malhar/lib/wal/FSWindowReplayWAL.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/wal/FSWindowReplayWAL.java b/library/src/main/java/org/apache/apex/malhar/lib/wal/FSWindowReplayWAL.java new file mode 100644 index 0000000..326c7a3 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/wal/FSWindowReplayWAL.java @@ -0,0 +1,188 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.wal; + +import java.io.IOException; +import java.util.TreeMap; + +import javax.validation.constraints.NotNull; + +import org.apache.hadoop.fs.Path; + +import com.google.common.collect.TreeMultimap; + +import com.datatorrent.netlet.util.Slice; + +/** + * A {@link FileSystemWAL} that WindowDataManager uses to save state of every window. + */ +public class FSWindowReplayWAL extends FileSystemWAL +{ + transient boolean readOnly; + + transient TreeMultimap<Integer, FileDescriptor> fileDescriptors = TreeMultimap.create(); + + //all the readers will read to this point while replaying. + transient FileSystemWALPointer walEndPointerAfterRecovery; + transient Slice retrievedWindow; + + transient TreeMap<Long, Integer> windowWalParts = new TreeMap<>(); + + FSWindowReplayWAL() + { + super(); + setInBatchMode(true); + setFileSystemWALWriter(new WriterThatFinalizesImmediately(this)); + } + + FSWindowReplayWAL(boolean readOnly) + { + this(); + this.readOnly = readOnly; + } + + @Override + public void setup() + { + try { + if (getMaxLength() == 0) { + setMaxLength(fileContext.getDefaultFileSystem().getServerDefaults().getBlockSize()); + } + if (!readOnly) { + getWriter().recover(); + } + } catch (IOException e) { + throw new RuntimeException("while setup"); + } + } + + public FileSystemWALPointer getWalEndPointerAfterRecovery() + { + return walEndPointerAfterRecovery; + } + + /** + * Finalizes files just after rotation. Doesn't wait for the window to be committed. + */ + static class WriterThatFinalizesImmediately extends FileSystemWAL.FileSystemWALWriter + { + + private WriterThatFinalizesImmediately() + { + super(); + } + + protected WriterThatFinalizesImmediately(@NotNull FileSystemWAL fileSystemWal) + { + super(fileSystemWal); + } + + /** + * Finalize the file immediately after rotation. + * @param partNum part number + * @throws IOException + */ + @Override + protected void rotated(int partNum) throws IOException + { + finalize(partNum); + } + + @Override + protected void recover() throws IOException + { + restoreActivePart(); + } + } + + static class FileDescriptor implements Comparable<FileDescriptor> + { + int part; + boolean isTmp; + long time; + Path filePath; + + static FileDescriptor create(Path filePath) + { + FileDescriptor descriptor = new FileDescriptor(); + descriptor.filePath = filePath; + + String name = filePath.getName(); + String[] parts = name.split("\\."); + + String[] namePart = parts[0].split("_"); + descriptor.part = Integer.parseInt(namePart[1]); + + if (parts.length == 3) { + descriptor.isTmp = true; + descriptor.time = Long.parseLong(parts[1]); + } + + return descriptor; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (!(o instanceof FileDescriptor)) { + return false; + } + + FileDescriptor that = (FileDescriptor)o; + + if (part != that.part) { + return false; + } + if (isTmp != that.isTmp) { + return false; + } + return time == that.time; + + } + + @Override + public int hashCode() + { + int result = part; + result = 31 * result + (isTmp ? 1 : 0); + result = 31 * result + (int)(time ^ (time >>> 32)); + return result; + } + + @Override + public int compareTo(FileDescriptor o) + { + if (part < o.part) { + return -1; + } else if (part > o.part) { + return 1; + } else { + if (isTmp && !o.isTmp) { + return -1; + } else if (!isTmp && o.isTmp) { + return 1; + } + return Long.compare(time, o.time); + } + } + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1b4536ca/library/src/main/java/org/apache/apex/malhar/lib/wal/FileSystemWAL.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/wal/FileSystemWAL.java b/library/src/main/java/org/apache/apex/malhar/lib/wal/FileSystemWAL.java index f454188..b7d5ba1 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/wal/FileSystemWAL.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/wal/FileSystemWAL.java @@ -56,7 +56,7 @@ import com.datatorrent.netlet.util.Slice; * <p/> * Note:<br/> * The FileSystem Writer and Reader operations should not alternate because intermingling these operations will cause - * problems. Typically the WAL Reader will only used in recovery.<br/> + * problems. Typically the WAL Reader will only used in recovery or replay of finished windows.<br/> * * Also this implementation is thread unsafe- the filesystem wal writer and reader operations should be performed in * operator's thread. @@ -73,7 +73,7 @@ public class FileSystemWAL implements WAL<FileSystemWAL.FileSystemWALReader, Fil @Min(0) private long maxLength; - private FileSystemWALPointer walStartPointer = new FileSystemWALPointer(0, 0); + FileSystemWALPointer walStartPointer = new FileSystemWALPointer(0, 0); @NotNull private FileSystemWAL.FileSystemWALReader fileSystemWALReader = new FileSystemWALReader(this); @@ -81,21 +81,26 @@ public class FileSystemWAL implements WAL<FileSystemWAL.FileSystemWALReader, Fil @NotNull private FileSystemWAL.FileSystemWALWriter fileSystemWALWriter = new FileSystemWALWriter(this); - //part => tmp file path; - private final Map<Integer, String> tempPartFiles = new TreeMap<>(); + //part => tmp file filePath; + final Map<Integer, String> tempPartFiles = new TreeMap<>(); private long lastCheckpointedWindow = Stateless.WINDOW_ID; + private boolean hardLimitOnMaxLength; + + private boolean inBatchMode; + + transient FileContext fileContext; + @Override public void setup() { try { - FileContext fileContext = FileContextUtils.getFileContext(filePath); + fileContext = FileContextUtils.getFileContext(filePath); if (maxLength == 0) { maxLength = fileContext.getDefaultFileSystem().getServerDefaults().getBlockSize(); } - fileSystemWALWriter.open(fileContext); - fileSystemWALReader.open(fileContext); + fileSystemWALWriter.recover(); } catch (IOException e) { throw new RuntimeException("during setup", e); } @@ -158,6 +163,14 @@ public class FileSystemWAL implements WAL<FileSystemWAL.FileSystemWALReader, Fil { return filePath + "_" + partNumber; } + + /** + * @return the wal start pointer + */ + public FileSystemWALPointer getWalStartPointer() + { + return walStartPointer; + } @Override public FileSystemWALReader getReader() @@ -227,6 +240,47 @@ public class FileSystemWAL implements WAL<FileSystemWAL.FileSystemWALReader, Fil this.maxLength = maxLength; } + /** + * @return true if there is a hard limit on max length; false otherwise. + */ + public boolean isHardLimitOnMaxLength() + { + return hardLimitOnMaxLength; + } + + /** + * When hard limit on max length is true, then a wal part file will never exceed the the max length.<br/> + * Entry is appended to the next part if adding it to the current part exceeds the max length. + * <p/> + * When hard limit on max length if false, then a wal part file can exceed the max length if the last entry makes the + * wal part exceeds the max length. By default this is set to false. + * + * @param hardLimitOnMaxLength + */ + public void setHardLimitOnMaxLength(boolean hardLimitOnMaxLength) + { + this.hardLimitOnMaxLength = hardLimitOnMaxLength; + } + + /** + * @return true if writing in batch mode; false otherwise. + */ + protected boolean isInBatchMode() + { + return inBatchMode; + } + + /** + * When in batch mode, a file is rotated only when a batch gets completed. This facilitates writing multiple entries + * that will all be written to the same part file. + * + * @param inBatchMode write in batch mode or not. + */ + protected void setInBatchMode(boolean inBatchMode) + { + this.inBatchMode = inBatchMode; + } + public static class FileSystemWALPointer implements Comparable<FileSystemWALPointer> { private final int partNum; @@ -261,6 +315,11 @@ public class FileSystemWAL implements WAL<FileSystemWAL.FileSystemWALReader, Fil return offset; } + public FileSystemWALPointer getCopy() + { + return new FileSystemWALPointer(partNum, offset); + } + @Override public String toString() { @@ -273,16 +332,15 @@ public class FileSystemWAL implements WAL<FileSystemWAL.FileSystemWALReader, Fil */ public static class FileSystemWALReader implements WAL.WALReader<FileSystemWALPointer> { - private FileSystemWALPointer currentPointer; + private transient FileSystemWALPointer currentPointer; private transient DataInputStream inputStream; private transient Path currentOpenPath; private transient boolean isOpenPathTmp; private final FileSystemWAL fileSystemWAL; - private transient FileContext fileContext; - private FileSystemWALReader() + protected FileSystemWALReader() { //for kryo fileSystemWAL = null; @@ -291,16 +349,10 @@ public class FileSystemWAL implements WAL<FileSystemWAL.FileSystemWALReader, Fil public FileSystemWALReader(@NotNull FileSystemWAL fileSystemWal) { this.fileSystemWAL = Preconditions.checkNotNull(fileSystemWal, "wal"); - currentPointer = new FileSystemWALPointer(fileSystemWal.walStartPointer.partNum, - fileSystemWal.walStartPointer.offset); } - protected void open(@NotNull FileContext fileContext) throws IOException - { - this.fileContext = Preconditions.checkNotNull(fileContext, "fileContext"); - } - - protected void close() throws IOException + @Override + public void close() throws IOException { if (inputStream != null) { inputStream.close(); @@ -350,9 +402,9 @@ public class FileSystemWAL implements WAL<FileSystemWAL.FileSystemWALReader, Fil isOpenPathTmp = false; } - LOG.debug("path to read {} and pointer {}", pathToReadFrom, walPointer); - if (fileContext.util().exists(pathToReadFrom)) { - DataInputStream stream = fileContext.open(pathToReadFrom); + LOG.debug("filePath to read {} and pointer {}", pathToReadFrom, walPointer); + if (fileSystemWAL.fileContext.util().exists(pathToReadFrom)) { + DataInputStream stream = fileSystemWAL.fileContext.open(pathToReadFrom); if (walPointer.offset > 0) { stream.skip(walPointer.offset); } @@ -365,6 +417,20 @@ public class FileSystemWAL implements WAL<FileSystemWAL.FileSystemWALReader, Fil @Override public Slice next() throws IOException { + return readOrSkip(false); + } + + @Override + public void skipNext() throws IOException + { + readOrSkip(true); + } + + private Slice readOrSkip(boolean skip) throws IOException + { + if (currentPointer == null) { + currentPointer = fileSystemWAL.walStartPointer; + } do { if (inputStream == null) { inputStream = getInputStream(currentPointer); @@ -376,21 +442,36 @@ public class FileSystemWAL implements WAL<FileSystemWAL.FileSystemWALReader, Fil inputStream = getInputStream(currentPointer); } - if (inputStream != null && currentPointer.offset < fileContext.getFileStatus(currentOpenPath).getLen()) { + if (inputStream != null && currentPointer.offset < + fileSystemWAL.fileContext.getFileStatus(currentOpenPath).getLen()) { int len = inputStream.readInt(); Preconditions.checkState(len >= 0, "negative length"); - byte[] data = new byte[len]; - inputStream.readFully(data); + if (!skip) { + byte[] data = new byte[len]; + inputStream.readFully(data); + currentPointer.offset += len + 4; + return new Slice(data); - currentPointer.offset += data.length + 4; - return new Slice(data); + } else { + long actualSkipped = inputStream.skip(len); + if (actualSkipped != len) { + throw new IOException("unable to skip " + len); + } + currentPointer.offset += len + 4; + return null; + } } } while (nextSegment()); close(); return null; } + public FileSystemWALPointer getCurrentPointer() + { + return currentPointer; + } + @Override public FileSystemWALPointer getStartPointer() { @@ -410,52 +491,52 @@ public class FileSystemWAL implements WAL<FileSystemWAL.FileSystemWALReader, Fil private final Map<Long, Integer> pendingFinalization = new TreeMap<>(); private final FileSystemWAL fileSystemWAL; - private transient FileContext fileContext; private int latestFinalizedPart = -1; private int lowestDeletedPart = -1; - private FileSystemWALWriter() + protected FileSystemWALWriter() { //for kryo fileSystemWAL = null; } - public FileSystemWALWriter(@NotNull FileSystemWAL fileSystemWal) + protected FileSystemWALWriter(@NotNull FileSystemWAL fileSystemWal) { this.fileSystemWAL = Preconditions.checkNotNull(fileSystemWal, "wal"); } - protected void open(@NotNull FileContext fileContext) throws IOException + protected void recover() throws IOException { - this.fileContext = Preconditions.checkNotNull(fileContext, "file context"); - recover(); + restoreActivePart(); + deleteStrayTmpFiles(); } - private void recover() throws IOException + void restoreActivePart() throws IOException { - LOG.debug("current point", currentPointer); + LOG.debug("restore part {}", currentPointer); String tmpFilePath = fileSystemWAL.tempPartFiles.get(currentPointer.getPartNum()); if (tmpFilePath != null) { - Path tmpPath = new Path(tmpFilePath); - if (fileContext.util().exists(tmpPath)) { - LOG.debug("tmp path exists {}", tmpPath); + Path inputPath = new Path(tmpFilePath); + if (fileSystemWAL.fileContext.util().exists(inputPath)) { + LOG.debug("input path exists {}", inputPath); + //temp file output stream outputStream = getOutputStream(new FileSystemWALPointer(currentPointer.partNum, 0)); - DataInputStream inputStreamOldTmp = fileContext.open(tmpPath); + DataInputStream inputStream = fileSystemWAL.fileContext.open(inputPath); - IOUtils.copyPartial(inputStreamOldTmp, currentPointer.offset, outputStream); + IOUtils.copyPartial(inputStream, currentPointer.offset, outputStream); - outputStream.flush(); - //remove old tmp - inputStreamOldTmp.close(); - LOG.debug("delete tmp {}", tmpPath); - fileContext.delete(tmpPath, true); + flush(); + inputStream.close(); } } + } - //find all valid path names + void deleteStrayTmpFiles() throws IOException + { + //find all valid filePath names Set<String> validPathNames = new HashSet<>(); for (Map.Entry<Integer, String> entry : fileSystemWAL.tempPartFiles.entrySet()) { if (entry.getKey() <= currentPointer.partNum) { @@ -468,22 +549,22 @@ public class FileSystemWAL implements WAL<FileSystemWAL.FileSystemWALReader, Fil //which aren't accounted by tmp files map Path walPath = new Path(fileSystemWAL.filePath); Path parentWAL = walPath.getParent(); - if (parentWAL != null && fileContext.util().exists(parentWAL)) { - RemoteIterator<FileStatus> remoteIterator = fileContext.listStatus(parentWAL); + if (parentWAL != null && fileSystemWAL.fileContext.util().exists(parentWAL)) { + RemoteIterator<FileStatus> remoteIterator = fileSystemWAL.fileContext.listStatus(parentWAL); while (remoteIterator.hasNext()) { FileStatus status = remoteIterator.next(); String fileName = status.getPath().getName(); if (fileName.startsWith(walPath.getName()) && fileName.endsWith(TMP_EXTENSION) && !validPathNames.contains(fileName)) { LOG.debug("delete stray tmp {}", status.getPath()); - fileContext.delete(status.getPath(), true); + fileSystemWAL.fileContext.delete(status.getPath(), true); } } } - } - protected void close() throws IOException + @Override + public void close() throws IOException { if (outputStream != null) { outputStream.close(); @@ -499,23 +580,27 @@ public class FileSystemWAL implements WAL<FileSystemWAL.FileSystemWALReader, Fil outputStream = getOutputStream(currentPointer); } - int entryLength = entry.length + 4; + int entrySize = entry.length + 4; + + if (fileSystemWAL.hardLimitOnMaxLength) { + Preconditions.checkArgument(entrySize > fileSystemWAL.maxLength, "entry too big. increase the max length"); + } // rotate if needed - if (shouldRotate(entryLength)) { + if (fileSystemWAL.hardLimitOnMaxLength && shouldRotate(entrySize) && !fileSystemWAL.inBatchMode) { rotate(true); } outputStream.writeInt(entry.length); outputStream.write(entry.buffer, entry.offset, entry.length); - currentPointer.offset += entryLength; + currentPointer.offset += entrySize; - if (currentPointer.offset == fileSystemWAL.maxLength) { + if (currentPointer.offset >= fileSystemWAL.maxLength && !fileSystemWAL.inBatchMode) { //if the file is completed then we can rotate it. do not have to wait for next entry rotate(false); } - return entryLength; + return entrySize; } @Override @@ -543,9 +628,9 @@ public class FileSystemWAL implements WAL<FileSystemWAL.FileSystemWALReader, Fil if (i <= latestFinalizedPart) { //delete a part only if it is finalized. Path partPath = new Path(fileSystemWAL.getPartFilePath(i)); - if (fileContext.util().exists(partPath)) { + if (fileSystemWAL.fileContext.util().exists(partPath)) { LOG.debug("delete {}", partPath); - fileContext.delete(partPath, true); + fileSystemWAL.fileContext.delete(partPath, true); lastPartDeleted = i; } else { break; @@ -560,18 +645,18 @@ public class FileSystemWAL implements WAL<FileSystemWAL.FileSystemWALReader, Fil if (pointer.partNum <= latestFinalizedPart && pointer.offset > 0) { String part = fileSystemWAL.getPartFilePath(pointer.partNum); Path inputPartPath = new Path(part); - long length = fileContext.getFileStatus(inputPartPath).getLen(); + long length = fileSystemWAL.fileContext.getFileStatus(inputPartPath).getLen(); LOG.debug("truncate {} from {} length {}", part, pointer.offset, length); if (length > pointer.offset) { - String temp = getTmpFilePath(part); + String temp = createTmpFilePath(part); Path tmpPart = new Path(temp); - DataInputStream inputStream = fileContext.open(inputPartPath); - DataOutputStream outputStream = fileContext.create(tmpPart, EnumSet.of(CreateFlag.CREATE, CreateFlag.APPEND), - Options.CreateOpts.CreateParent.createParent()); + DataInputStream inputStream = fileSystemWAL.fileContext.open(inputPartPath); + DataOutputStream outputStream = fileSystemWAL.fileContext.create(tmpPart, + EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE), Options.CreateOpts.CreateParent.createParent()); IOUtils.copyPartial(inputStream, pointer.offset, length - pointer.offset, outputStream); inputStream.close(); @@ -582,7 +667,7 @@ public class FileSystemWAL implements WAL<FileSystemWAL.FileSystemWALReader, Fil fileSystemWAL.walStartPointer.offset = 0; } - fileContext.rename(tmpPart, inputPartPath, Options.Rename.OVERWRITE); + fileSystemWAL.fileContext.rename(tmpPart, inputPartPath, Options.Rename.OVERWRITE); } } } @@ -590,8 +675,8 @@ public class FileSystemWAL implements WAL<FileSystemWAL.FileSystemWALReader, Fil protected void flush() throws IOException { if (outputStream != null) { - if (fileContext.getDefaultFileSystem() instanceof LocalFs || - fileContext.getDefaultFileSystem() instanceof RawLocalFs) { + if (fileSystemWAL.fileContext.getDefaultFileSystem() instanceof LocalFs || + fileSystemWAL.fileContext.getDefaultFileSystem() instanceof RawLocalFs) { //until the stream is closed on the local FS, readers don't see any data. close(); } else { @@ -607,19 +692,38 @@ public class FileSystemWAL implements WAL<FileSystemWAL.FileSystemWALReader, Fil return currentPointer.offset + entryLength > fileSystemWAL.maxLength; } - protected void rotate(boolean openNextFile) throws IOException + void rotate(boolean openNextFile) throws IOException { flush(); close(); - //all parts up to current part can be finalized. - pendingFinalization.put(fileSystemWAL.getLastCheckpointedWindow(), currentPointer.partNum); - LOG.debug("rotate {} to {}", currentPointer.partNum, currentPointer.partNum + 1); + int partNum = currentPointer.partNum; + LOG.debug("rotate {} to {}", partNum, currentPointer.partNum + 1); currentPointer = new FileSystemWALPointer(currentPointer.partNum + 1, 0); if (openNextFile) { //if adding the new entry to the file can cause the current file to exceed the max length then it is rotated. outputStream = getOutputStream(currentPointer); } + + rotated(partNum); + } + + /** + * When the wal is used in batch-mode, this method will trigger rotation if the current part file is completed. + * @throws IOException + */ + protected void rotateIfNecessary() throws IOException + { + if (fileSystemWAL.inBatchMode && currentPointer.offset >= fileSystemWAL.maxLength) { + //if the file is completed then we can rotate it + rotate(false); + } + } + + protected void rotated(int partNum) throws IOException + { + //all parts up to current part can be finalized. + pendingFinalization.put(fileSystemWAL.getLastCheckpointedWindow(), partNum); } protected void finalizeFiles(long window) throws IOException @@ -640,14 +744,7 @@ public class FileSystemWAL implements WAL<FileSystemWAL.FileSystemWALReader, Fil int partToFinalizeTill = entry.getValue(); for (int i = largestPartAvailable; i <= partToFinalizeTill; i++) { - String tmpToFinalize = fileSystemWAL.tempPartFiles.remove(i); - Path tmpPath = new Path(tmpToFinalize); - - if (fileContext.util().exists(tmpPath)) { - LOG.debug("finalize {} of part {}", tmpToFinalize, i); - fileContext.rename(tmpPath, new Path(fileSystemWAL.getPartFilePath(i)), Options.Rename.OVERWRITE); - latestFinalizedPart = i; - } + finalize(i); } largestPartAvailable = partToFinalizeTill + 1; } @@ -659,37 +756,60 @@ public class FileSystemWAL implements WAL<FileSystemWAL.FileSystemWALReader, Fil } } + protected void finalize(int partNum) throws IOException + { + String tmpToFinalize = fileSystemWAL.tempPartFiles.remove(partNum); + Path tmpPath = new Path(tmpToFinalize); + if (fileSystemWAL.fileContext.util().exists(tmpPath)) { + LOG.debug("finalize {} of part {}", tmpPath, partNum); + fileSystemWAL.fileContext.rename(tmpPath, new Path(fileSystemWAL.getPartFilePath(partNum)), + Options.Rename.OVERWRITE); + latestFinalizedPart = partNum; + } + } + private DataOutputStream getOutputStream(FileSystemWALPointer pointer) throws IOException { Preconditions.checkArgument(outputStream == null, "output stream is not null"); - if (pointer.offset > 0 && (fileContext.getDefaultFileSystem() instanceof LocalFs || - fileContext.getDefaultFileSystem() instanceof RawLocalFs)) { - //on local file system the stream is closed instead of flush so we open it again in append mode if the - //offset > 0. - return fileContext.create(new Path(fileSystemWAL.tempPartFiles.get(pointer.partNum)), + if (pointer.offset > 0 && (fileSystemWAL.fileContext.getDefaultFileSystem() instanceof LocalFs || + fileSystemWAL.fileContext.getDefaultFileSystem() instanceof RawLocalFs)) { + //On local file system the stream is always closed and never flushed so we open it again in append mode if the + //offset > 0. This block is entered only when appending to wal while writing on local fs. + return fileSystemWAL.fileContext.create(new Path(fileSystemWAL.tempPartFiles.get(pointer.partNum)), EnumSet.of(CreateFlag.CREATE, CreateFlag.APPEND), Options.CreateOpts.CreateParent.createParent()); } String partFile = fileSystemWAL.getPartFilePath(pointer.partNum); - String tmpFilePath = getTmpFilePath(partFile); + String tmpFilePath = createTmpFilePath(partFile); fileSystemWAL.tempPartFiles.put(pointer.partNum, tmpFilePath); Preconditions.checkArgument(pointer.offset == 0, "offset > 0"); LOG.debug("open {} => {}", pointer.partNum, tmpFilePath); - outputStream = fileContext.create(new Path(tmpFilePath), + outputStream = fileSystemWAL.fileContext.create(new Path(tmpFilePath), EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE), Options.CreateOpts.CreateParent.createParent()); return outputStream; } + //visible to WindowDataManager + FileSystemWALPointer getCurrentPointer() + { + return currentPointer; + } + + //visible to WindowDataManager + void setCurrentPointer(@NotNull FileSystemWALPointer pointer) + { + this.currentPointer = Preconditions.checkNotNull(pointer, "pointer"); + } } - private static String getTmpFilePath(String filePath) + private static String createTmpFilePath(String filePath) { return filePath + '.' + System.currentTimeMillis() + TMP_EXTENSION; } - private static final String TMP_EXTENSION = ".tmp"; + static final String TMP_EXTENSION = ".tmp"; private static final Logger LOG = LoggerFactory.getLogger(FileSystemWAL.class); } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1b4536ca/library/src/main/java/org/apache/apex/malhar/lib/wal/WAL.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/wal/WAL.java b/library/src/main/java/org/apache/apex/malhar/lib/wal/WAL.java index 45432d5..da21a6d 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/wal/WAL.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/wal/WAL.java @@ -51,7 +51,7 @@ public interface WAL<READER extends WAL.WALReader, WRITER extends WAL.WALWriter> * Provides iterator like interface to read entries from the WAL. * @param <P> type of Pointer in the WAL */ - interface WALReader<P> + interface WALReader<P> extends AutoCloseable { /** * Seek to middle of the WAL. This is used primarily during recovery, @@ -66,6 +66,11 @@ public interface WAL<READER extends WAL.WALReader, WRITER extends WAL.WALWriter> Slice next() throws IOException; /** + * Skips the next entry in the WAL. + */ + void skipNext() throws IOException; + + /** * Returns the start pointer from which data is available to read.<br/> * WAL Writer supports purging of aged data so the start pointer will change over time. * @@ -78,7 +83,7 @@ public interface WAL<READER extends WAL.WALReader, WRITER extends WAL.WALWriter> * Provide method to write entries to the WAL. * @param <P> type of Pointer in the WAL */ - interface WALWriter<P> + interface WALWriter<P> extends AutoCloseable { /** * Write an entry to the WAL http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1b4536ca/library/src/main/java/org/apache/apex/malhar/lib/wal/WindowDataManager.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/wal/WindowDataManager.java b/library/src/main/java/org/apache/apex/malhar/lib/wal/WindowDataManager.java index a1917a6..2622a0d 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/wal/WindowDataManager.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/wal/WindowDataManager.java @@ -19,37 +19,61 @@ package org.apache.apex.malhar.lib.wal; import java.io.IOException; -import java.util.Collection; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.Set; import com.datatorrent.api.Component; import com.datatorrent.api.Context; -import com.datatorrent.api.StorageAgent; import com.datatorrent.api.annotation.Stateless; import com.datatorrent.lib.io.fs.AbstractFileInputOperator; /** - * An idempotent storage manager allows an operator to emit the same tuples in every replayed application window. - * An idempotent agent cannot make any guarantees about the tuples emitted in the application window which fails. + * WindowDataManager manages the state of an operator every application window. It can be used to replay tuples in + * the input operator after re-deployment for a window which was not check-pointed but processing was completed before + * failure.<br/> + * + * However, it cannot make any guarantees about the tuples emitted in the application window during which the operator + * failed.<br/> * * The order of tuples is guaranteed for ordered input sources. * - * <b>Important:</b> In order for an idempotent storage manager to function correctly it cannot allow + * <b>Important:</b> In order for an WindowDataManager to function correctly it cannot allow * checkpoints to occur within an application window and checkpoints must be aligned with * application window boundaries. * * @since 2.0.0 */ -public interface WindowDataManager extends StorageAgent, Component<Context.OperatorContext> +public interface WindowDataManager extends Component<Context.OperatorContext> { /** - * Gets the largest window for which there is recovery data. + * Save the state for a window id. + * @param object state + * @param windowId window id + * @throws IOException + */ + void save(Object object, long windowId) throws IOException; + + /** + * Gets the object saved for the provided window id. <br/> + * Typically it is used to replay tuples of successive windows in input operators after failure. + * + * @param windowId window id + * @return saved state for the window id. + * @throws IOException + */ + Object retrieve(long windowId) throws IOException; + + /** + * Gets the largest window which was completed. * @return Returns the window id */ - long getLargestRecoveryWindow(); + long getLargestCompletedWindow(); /** + * Fetches the state saved for a window id for all the partitions. + * <p/> * When an operator can partition itself dynamically then there is no guarantee that an input state which was being * handled by one instance previously will be handled by the same instance after partitioning. <br/> * For eg. An {@link AbstractFileInputOperator} instance which reads a File X till offset l (not check-pointed) may no @@ -58,39 +82,27 @@ public interface WindowDataManager extends StorageAgent, Component<Context.Opera * The new instance wouldn't know from what point to read the File X unless it reads the idempotent storage of all the * operators for the window being replayed and fix it's state. * - * @param windowId window id. - * @return mapping of operator id to the corresponding state + * @param windowId window id + * @return saved state per operator partitions for the given window. * @throws IOException */ - Map<Integer, Object> load(long windowId) throws IOException; + Map<Integer, Object> retrieveAllPartitions(long windowId) throws IOException; /** - * Delete the artifacts of the operator for windows <= windowId. + * Delete the artifacts for windows <= windowId. * - * @param operatorId operator id * @param windowId window id * @throws IOException */ - void deleteUpTo(int operatorId, long windowId) throws IOException; + void committed(long windowId) throws IOException; /** - * This informs the idempotent storage manager that operator is partitioned so that it can set properties and - * distribute state. + * Creates new window data managers during repartitioning. * - * @param newManagers all the new idempotent storage managers. + * @param newCount count of new window data managers. * @param removedOperatorIds set of operator ids which were removed after partitioning. */ - void partitioned(Collection<WindowDataManager> newManagers, Set<Integer> removedOperatorIds); - - /** - * Returns an array of windowIds for which data was stored by atleast one partition. The array - * of winodwIds is sorted. - * - * @return An array of windowIds for which data was stored by atleast one partition. The array - * of winodwIds is sorted. - * @throws IOException - */ - long[] getWindowIds() throws IOException; + List<WindowDataManager> partition(int newCount, Set<Integer> removedOperatorIds); /** * This {@link WindowDataManager} will never do recovery. This is a convenience class so that operators @@ -98,21 +110,25 @@ public interface WindowDataManager extends StorageAgent, Component<Context.Opera */ class NoopWindowDataManager implements WindowDataManager { - @Override - public long getLargestRecoveryWindow() + public long getLargestCompletedWindow() { return Stateless.WINDOW_ID; } @Override - public Map<Integer, Object> load(long windowId) throws IOException + public Map<Integer, Object> retrieveAllPartitions(long windowId) throws IOException { return null; } @Override - public void partitioned(Collection<WindowDataManager> newManagers, Set<Integer> removedOperatorIds) + public List<WindowDataManager> partition(int newCount, Set<Integer> removedOperatorIds) { + List<WindowDataManager> managers = new ArrayList<>(); + for (int i = 0; i < newCount; i++) { + managers.add(new NoopWindowDataManager()); + } + return managers; } @Override @@ -126,36 +142,19 @@ public interface WindowDataManager extends StorageAgent, Component<Context.Opera } @Override - public void save(Object object, int operatorId, long windowId) throws IOException + public void save(Object object, long windowId) throws IOException { } @Override - public Object load(int operatorId, long windowId) throws IOException + public Object retrieve(long windowId) throws IOException { return null; } @Override - public void delete(int operatorId, long windowId) throws IOException - { - } - - @Override - public void deleteUpTo(int operatorId, long windowId) throws IOException - { - } - - @Override - public long[] getWindowIds(int operatorId) throws IOException - { - return new long[0]; - } - - @Override - public long[] getWindowIds() throws IOException + public void committed(long windowId) throws IOException { - return new long[0]; } } } http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1b4536ca/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoPollableOpeartorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoPollableOpeartorTest.java b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoPollableOpeartorTest.java index 2f3f356..0f9a7c9 100644 --- a/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoPollableOpeartorTest.java +++ b/library/src/test/java/com/datatorrent/lib/db/jdbc/JdbcPojoPollableOpeartorTest.java @@ -178,8 +178,8 @@ public class JdbcPojoPollableOpeartorTest extends JdbcOperatorTest public void testRecovery() throws IOException { int operatorId = 1; - when(windowDataManagerMock.getLargestRecoveryWindow()).thenReturn(1L); - when(windowDataManagerMock.load(operatorId, 1)).thenReturn(new MutablePair<Integer, Integer>(0, 4)); + when(windowDataManagerMock.getLargestCompletedWindow()).thenReturn(1L); + when(windowDataManagerMock.retrieve(1)).thenReturn(new MutablePair<Integer, Integer>(0, 4)); insertEvents(10, true, 0); http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/1b4536ca/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java index 7990049..e1f23d1 100644 --- a/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java +++ b/library/src/test/java/com/datatorrent/lib/io/fs/AbstractFileInputOperatorTest.java @@ -44,7 +44,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import com.esotericsoftware.kryo.Kryo; -import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -617,7 +616,7 @@ public class AbstractFileInputOperatorTest LineByLineFileInputOperator oper = new LineByLineFileInputOperator(); FSWindowDataManager manager = new FSWindowDataManager(); - manager.setRecoveryPath(testMeta.dir + "/recovery"); + manager.setStatePath(testMeta.dir + "/recovery"); oper.setWindowDataManager(manager); @@ -666,7 +665,7 @@ public class AbstractFileInputOperatorTest LineByLineFileInputOperator oper = new LineByLineFileInputOperator(); FSWindowDataManager manager = new FSWindowDataManager(); - manager.setRecoveryPath(testMeta.dir + "/recovery"); + manager.setStatePath(testMeta.dir + "/recovery"); oper.setWindowDataManager(manager); @@ -709,7 +708,7 @@ public class AbstractFileInputOperatorTest LineByLineFileInputOperator oper = new LineByLineFileInputOperator(); FSWindowDataManager manager = new FSWindowDataManager(); - manager.setRecoveryPath(testMeta.dir + "/recovery"); + manager.setStatePath(testMeta.dir + "/recovery"); oper.setEmitBatchSize(5); oper.setWindowDataManager(manager); @@ -772,7 +771,7 @@ public class AbstractFileInputOperatorTest LineByLineFileInputOperator oper = new LineByLineFileInputOperator(); FSWindowDataManager manager = new FSWindowDataManager(); - manager.setRecoveryPath(testMeta.dir + "/recovery"); + manager.setStatePath(testMeta.dir + "/recovery"); oper.setWindowDataManager(manager); @@ -817,12 +816,12 @@ public class AbstractFileInputOperatorTest } @Test - public void testIdempotentStorageManagerPartitioning() throws Exception + public void testWindowDataManagerPartitioning() throws Exception { LineByLineFileInputOperator oper = new LineByLineFileInputOperator(); oper.getScanner().setFilePatternRegexp(".*partition([\\d]*)"); oper.setDirectory(new File(testMeta.dir).getAbsolutePath()); - oper.setWindowDataManager(new TestStorageManager()); + oper.setWindowDataManager(new FSWindowDataManager()); oper.operatorId = 7; Path path = new Path(new File(testMeta.dir).getAbsolutePath()); @@ -838,15 +837,15 @@ public class AbstractFileInputOperatorTest Assert.assertEquals(2, newPartitions.size()); Assert.assertEquals(1, oper.getCurrentPartitions()); - List<TestStorageManager> storageManagers = Lists.newLinkedList(); + List<FSWindowDataManager> storageManagers = Lists.newLinkedList(); for (Partition<AbstractFileInputOperator<String>> p : newPartitions) { - storageManagers.add((TestStorageManager)p.getPartitionedInstance().getWindowDataManager()); + storageManagers.add((FSWindowDataManager)p.getPartitionedInstance().getWindowDataManager()); } Assert.assertEquals("count of storage managers", 2, storageManagers.size()); int countOfDeleteManagers = 0; - TestStorageManager deleteManager = null; - for (TestStorageManager storageManager : storageManagers) { + FSWindowDataManager deleteManager = null; + for (FSWindowDataManager storageManager : storageManagers) { if (storageManager.getDeletedOperators() != null) { countOfDeleteManagers++; deleteManager = storageManager; @@ -858,17 +857,6 @@ public class AbstractFileInputOperatorTest Assert.assertEquals("deleted operators", Sets.newHashSet(7), deleteManager.getDeletedOperators()); } - private static class TestStorageManager extends FSWindowDataManager - { - Set<Integer> getDeletedOperators() - { - if (deletedOperators != null) { - return ImmutableSet.copyOf(deletedOperators); - } - return null; - } - } - /** scanner to extract partition id from start of the filename */ static class MyScanner extends AbstractFileInputOperator.DirectoryScanner {
