[ https://issues.apache.org/jira/browse/APEXMALHAR-2063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15416641#comment-15416641 ]
ASF GitHub Bot commented on APEXMALHAR-2063: -------------------------------------------- Github user ilooner commented on a diff in the pull request: https://github.com/apache/apex-malhar/pull/322#discussion_r74373686 --- Diff: library/src/main/java/org/apache/apex/malhar/lib/wal/FSWindowDataManager.java --- @@ -51,291 +66,607 @@ public class FSWindowDataManager implements WindowDataManager { private static final String DEF_RECOVERY_PATH = "idempotentState"; - - protected transient FSStorageAgent storageAgent; + private static final String WAL_FILE_NAME = "wal"; /** - * Recovery path relative to app path where state is saved. + * Recovery filePath relative to app filePath where state is saved. */ @NotNull - private String recoveryPath; + private String recoveryPath = DEF_RECOVERY_PATH; private boolean isRecoveryPathRelativeToAppPath = true; /** - * largest window for which there is recovery data across all physical operator instances. + * This is not null only for one physical instance.<br/> + * It consists of operator ids which have been deleted but have some state that can be replayed. + * Only one of the instances would be handling (modifying) the files that belong to this state. <br/> + * The value is assigned during partitioning. */ - protected transient long largestRecoveryWindow; + private Set<Integer> deletedOperators; + + private boolean repartitioned; /** - * This is not null only for one physical instance.<br/> - * It consists of operator ids which have been deleted but have some state that can be replayed. - * Only one of the instances would be handling (modifying) the files that belong to this state. + * Used when it is not necessary to replay every streaming/app window. + * Used by {@link IncrementalCheckpointManager} */ - protected Set<Integer> deletedOperators; + private boolean relyOnCheckpoints; /** - * Sorted mapping from window id to all the operators that have state to replay for that window. + * largest window for which there is recovery data across all physical operator instances. */ - protected final transient TreeMultimap<Long, Integer> replayState; + private transient long largestRecoveryWindow = Stateless.WINDOW_ID; + + private final FSWindowReplayWAL wal = new FSWindowReplayWAL(); - protected transient FileSystem fs; - protected transient Path appPath; + //operator id -> wals (sorted) + private final transient Map<Integer, FSWindowReplayWAL> readOnlyWals = new HashMap<>(); + + private transient String statePath; + private transient int operatorId; + + private final transient Kryo kryo = new Kryo(); + + private transient FileContext fileContext; public FSWindowDataManager() { - replayState = TreeMultimap.create(); - largestRecoveryWindow = Stateless.WINDOW_ID; - recoveryPath = DEF_RECOVERY_PATH; + kryo.setClassLoader(Thread.currentThread().getContextClassLoader()); } @Override public void setup(Context.OperatorContext context) { - Configuration configuration = new Configuration(); + operatorId = context.getId(); + if (isRecoveryPathRelativeToAppPath) { - appPath = new Path(context.getValue(DAG.APPLICATION_PATH) + Path.SEPARATOR + recoveryPath); + statePath = context.getValue(DAG.APPLICATION_PATH) + Path.SEPARATOR + recoveryPath; } else { - appPath = new Path(recoveryPath); + statePath = recoveryPath; } try { - storageAgent = new FSStorageAgent(appPath.toString(), configuration); + fileContext = FileContextUtils.getFileContext(statePath); + setupWals(context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID)); + } catch (IOException e) { + throw new RuntimeException(e); + } + } - fs = FileSystem.newInstance(appPath.toUri(), configuration); + private void setupWals(long activationWindow) throws IOException + { + findFiles(wal, operatorId); + configureWal(wal, operatorId, !relyOnCheckpoints); + + if (repartitioned) { + createReadOnlyWals(); + for (Map.Entry<Integer, FSWindowReplayWAL> entry : readOnlyWals.entrySet()) { + findFiles(entry.getValue(), entry.getKey()); + configureWal(entry.getValue(), entry.getKey(), true); + } + } - if (fs.exists(appPath)) { - FileStatus[] fileStatuses = fs.listStatus(appPath); + //find largest recovery window + if (!relyOnCheckpoints) { + long recoveryWindow = findLargestRecoveryWindow(wal, null); + //committed will not delete temp files so it is possible that when reading from files, a smaller window + //than the activation window is found. + if (recoveryWindow > activationWindow) { + largestRecoveryWindow = recoveryWindow; + } + if (wal.getReader().getCurrentPointer() != null) { + wal.getWriter().setCurrentPointer(wal.getReader().getCurrentPointer().getCopy()); + } + } else { + wal.walEndPointerAfterRecovery = wal.getWriter().getCurrentPointer(); + largestRecoveryWindow = wal.getLastCheckpointedWindow(); + } - for (FileStatus operatorDirStatus : fileStatuses) { - int operatorId = Integer.parseInt(operatorDirStatus.getPath().getName()); + if (repartitioned && largestRecoveryWindow > Stateless.WINDOW_ID) { + //find the min of max window ids: a downstream will not finish a window until all the upstream have finished it. + for (Map.Entry<Integer, FSWindowReplayWAL> entry : readOnlyWals.entrySet()) { - for (FileStatus status : fs.listStatus(operatorDirStatus.getPath())) { - String fileName = status.getPath().getName(); - if (fileName.endsWith(FSStorageAgent.TMP_FILE)) { - continue; - } - long windowId = Long.parseLong(fileName, 16); - replayState.put(windowId, operatorId); - if (windowId > largestRecoveryWindow) { - largestRecoveryWindow = windowId; - } + long recoveryWindow = Stateless.WINDOW_ID; + if (!relyOnCheckpoints) { + long window = findLargestRecoveryWindow(entry.getValue(), null); + if (window > activationWindow) { + recoveryWindow = window; } + } else { + recoveryWindow = findLargestRecoveryWindow(entry.getValue(), activationWindow); + } + + if (recoveryWindow < largestRecoveryWindow) { + largestRecoveryWindow = recoveryWindow; } } - } catch (IOException e) { - throw new RuntimeException(e); } + + //reset readers + wal.getReader().seek(wal.walStartPointer); + for (FSWindowReplayWAL wal : readOnlyWals.values()) { + wal.getReader().seek(wal.walStartPointer); + } + + wal.setup(); + for (FSWindowReplayWAL wal : readOnlyWals.values()) { + wal.setup(); + } + } - @Override - public void save(Object object, int operatorId, long windowId) throws IOException + protected void createReadOnlyWals() throws IOException { - storageAgent.save(object, operatorId, windowId); + RemoteIterator<FileStatus> operatorsIter = fileContext.listStatus(new Path(statePath)); + while (operatorsIter.hasNext()) { + FileStatus status = operatorsIter.next(); + int operatorId = Integer.parseInt(status.getPath().getName()); + + if (operatorId != this.operatorId) { + //create read-only wal for other partitions + FSWindowReplayWAL wal = new FSWindowReplayWAL(true); + readOnlyWals.put(operatorId, wal); + } + } } - @Override - public Object load(int operatorId, long windowId) throws IOException + private void configureWal(FSWindowReplayWAL wal, int operatorId, boolean updateWalState) throws IOException { - Set<Integer> operators = replayState.get(windowId); - if (operators == null || !operators.contains(operatorId)) { - return null; + String operatorDir = statePath + Path.SEPARATOR + operatorId; + wal.setFilePath(operatorDir + Path.SEPARATOR + WAL_FILE_NAME); + wal.fileContext = fileContext; + + if (updateWalState) { + if (!wal.fileDescriptors.isEmpty()) { + SortedSet<Integer> sortedParts = wal.fileDescriptors.keySet(); + + wal.walStartPointer = new FileSystemWAL.FileSystemWALPointer(sortedParts.first(), 0); + + FSWindowReplayWAL.FileDescriptor last = wal.fileDescriptors.get(sortedParts.last()).last(); + if (last.isTmp) { + wal.tempPartFiles.put(last.part, last.filePath.toString()); + } + } } - return storageAgent.load(operatorId, windowId); } - @Override - public void delete(int operatorId, long windowId) throws IOException + private void findFiles(FSWindowReplayWAL wal, int operatorId) throws IOException { - storageAgent.delete(operatorId, windowId); + String operatorDir = statePath + Path.SEPARATOR + operatorId; + Path operatorPath = new Path(operatorDir); + if (fileContext.util().exists(operatorPath)) { + RemoteIterator<FileStatus> walFilesIter = fileContext.listStatus(operatorPath); + + while (walFilesIter.hasNext()) { + FileStatus fileStatus = walFilesIter.next(); + FSWindowReplayWAL.FileDescriptor descriptor = FSWindowReplayWAL.FileDescriptor.create(fileStatus.getPath()); + wal.fileDescriptors.put(descriptor.part, descriptor); + } + } } - @Override - public Map<Integer, Object> load(long windowId) throws IOException + private long findLargestRecoveryWindow(FSWindowReplayWAL wal, Long ceilingWindow) throws IOException { - Set<Integer> operators = replayState.get(windowId); - if (operators == null) { - return null; - } - Map<Integer, Object> data = Maps.newHashMap(); - for (int operatorId : operators) { - data.put(operatorId, load(operatorId, windowId)); + if (!wal.fileDescriptors.isEmpty()) { + FileSystemWAL.FileSystemWALReader reader = wal.getReader(); + + //to find the largest window, we only need to look at the last file. + NavigableSet<Integer> descendingParts = new TreeSet<>(wal.fileDescriptors.keySet()).descendingSet(); + for (int part : descendingParts) { + FSWindowReplayWAL.FileDescriptor last = wal.fileDescriptors.get(part).last(); + reader.seek(new FileSystemWAL.FileSystemWALPointer(last.part, 0)); + + long endOffset = -1; + + long lastWindow = Stateless.WINDOW_ID; + Slice slice = readNext(reader); + + while (slice != null) { + boolean skipComplete = skipNext(reader); + if (!skipComplete) { + //artifact not saved so this window was not finished. + break; + } + Slice windowSlice = slice; + long offset = reader.getCurrentPointer().getOffset(); + slice = readNext(reader); //either null, deleted or next window + + if (slice == null || !slice.equals(DELETED)) { + //delete entry not found so window was not deleted + long window = Longs.fromByteArray(windowSlice.toByteArray()); + + if (ceilingWindow != null && window > ceilingWindow) { + break; + } + endOffset = offset; + lastWindow = window; --- End diff -- Would the endoffset have to be rolled back as well? > Integrate WAL to FS WindowDataManager > ------------------------------------- > > Key: APEXMALHAR-2063 > URL: https://issues.apache.org/jira/browse/APEXMALHAR-2063 > Project: Apache Apex Malhar > Issue Type: Improvement > Reporter: Chandni Singh > Assignee: Chandni Singh > > FS Window Data Manager is used to save meta-data that helps in replaying > tuples every completed application window after failure. For this it saves > meta-data in a file per window. Having multiple small size files on hdfs > cause issues as highlighted here: > http://blog.cloudera.com/blog/2009/02/the-small-files-problem/ > Instead FS Window Data Manager can utilize the WAL to write data and maintain > a mapping of how much data was flushed to WAL each window. > In order to use FileSystemWAL for replaying data of a finished window, there > are few changes made to FileSystemWAL this is because of following: > 1. WindowDataManager needs to reply data of every finished window. This > window may not be checkpointed. > FileSystemWAL truncates the WAL file to the checkpointed point after recovery > so this poses a problem. > WindowDataManager should be able to control recovery of FileSystemWAL. > 2. FileSystemWAL writes to temporary files. The mapping of temp files to > actual file is part of its state which is checkpointed. Since > WindowDataManager replays data of a window not yet checkpointed, it needs to > know the actual temporary file the data is being persisted to. -- This message was sent by Atlassian JIRA (v6.3.4#6332)