Github user ilooner commented on a diff in the pull request:
https://github.com/apache/apex-malhar/pull/322#discussion_r74372710
--- Diff:
library/src/main/java/org/apache/apex/malhar/lib/wal/FSWindowDataManager.java
---
@@ -51,291 +66,607 @@
public class FSWindowDataManager implements WindowDataManager
{
private static final String DEF_RECOVERY_PATH = "idempotentState";
-
- protected transient FSStorageAgent storageAgent;
+ private static final String WAL_FILE_NAME = "wal";
/**
- * Recovery path relative to app path where state is saved.
+ * Recovery filePath relative to app filePath where state is saved.
*/
@NotNull
- private String recoveryPath;
+ private String recoveryPath = DEF_RECOVERY_PATH;
private boolean isRecoveryPathRelativeToAppPath = true;
/**
- * largest window for which there is recovery data across all physical
operator instances.
+ * This is not null only for one physical instance.<br/>
+ * It consists of operator ids which have been deleted but have some
state that can be replayed.
+ * Only one of the instances would be handling (modifying) the files
that belong to this state. <br/>
+ * The value is assigned during partitioning.
*/
- protected transient long largestRecoveryWindow;
+ private Set<Integer> deletedOperators;
+
+ private boolean repartitioned;
/**
- * This is not null only for one physical instance.<br/>
- * It consists of operator ids which have been deleted but have some
state that can be replayed.
- * Only one of the instances would be handling (modifying) the files
that belong to this state.
+ * Used when it is not necessary to replay every streaming/app window.
+ * Used by {@link IncrementalCheckpointManager}
*/
- protected Set<Integer> deletedOperators;
+ private boolean relyOnCheckpoints;
/**
- * Sorted mapping from window id to all the operators that have state to
replay for that window.
+ * largest window for which there is recovery data across all physical
operator instances.
*/
- protected final transient TreeMultimap<Long, Integer> replayState;
+ private transient long largestRecoveryWindow = Stateless.WINDOW_ID;
+
+ private final FSWindowReplayWAL wal = new FSWindowReplayWAL();
- protected transient FileSystem fs;
- protected transient Path appPath;
+ //operator id -> wals (sorted)
+ private final transient Map<Integer, FSWindowReplayWAL> readOnlyWals =
new HashMap<>();
+
+ private transient String statePath;
+ private transient int operatorId;
+
+ private final transient Kryo kryo = new Kryo();
+
+ private transient FileContext fileContext;
public FSWindowDataManager()
{
- replayState = TreeMultimap.create();
- largestRecoveryWindow = Stateless.WINDOW_ID;
- recoveryPath = DEF_RECOVERY_PATH;
+ kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
}
@Override
public void setup(Context.OperatorContext context)
{
- Configuration configuration = new Configuration();
+ operatorId = context.getId();
+
if (isRecoveryPathRelativeToAppPath) {
- appPath = new Path(context.getValue(DAG.APPLICATION_PATH) +
Path.SEPARATOR + recoveryPath);
+ statePath = context.getValue(DAG.APPLICATION_PATH) + Path.SEPARATOR
+ recoveryPath;
} else {
- appPath = new Path(recoveryPath);
+ statePath = recoveryPath;
}
try {
- storageAgent = new FSStorageAgent(appPath.toString(), configuration);
+ fileContext = FileContextUtils.getFileContext(statePath);
+
setupWals(context.getValue(Context.OperatorContext.ACTIVATION_WINDOW_ID));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
- fs = FileSystem.newInstance(appPath.toUri(), configuration);
+ private void setupWals(long activationWindow) throws IOException
+ {
+ findFiles(wal, operatorId);
+ configureWal(wal, operatorId, !relyOnCheckpoints);
+
+ if (repartitioned) {
+ createReadOnlyWals();
+ for (Map.Entry<Integer, FSWindowReplayWAL> entry :
readOnlyWals.entrySet()) {
+ findFiles(entry.getValue(), entry.getKey());
+ configureWal(entry.getValue(), entry.getKey(), true);
+ }
+ }
- if (fs.exists(appPath)) {
- FileStatus[] fileStatuses = fs.listStatus(appPath);
+ //find largest recovery window
+ if (!relyOnCheckpoints) {
+ long recoveryWindow = findLargestRecoveryWindow(wal, null);
+ //committed will not delete temp files so it is possible that when
reading from files, a smaller window
+ //than the activation window is found.
+ if (recoveryWindow > activationWindow) {
+ largestRecoveryWindow = recoveryWindow;
+ }
+ if (wal.getReader().getCurrentPointer() != null) {
+
wal.getWriter().setCurrentPointer(wal.getReader().getCurrentPointer().getCopy());
+ }
+ } else {
+ wal.walEndPointerAfterRecovery = wal.getWriter().getCurrentPointer();
+ largestRecoveryWindow = wal.getLastCheckpointedWindow();
+ }
- for (FileStatus operatorDirStatus : fileStatuses) {
- int operatorId =
Integer.parseInt(operatorDirStatus.getPath().getName());
+ if (repartitioned && largestRecoveryWindow > Stateless.WINDOW_ID) {
+ //find the min of max window ids: a downstream will not finish a
window until all the upstream have finished it.
+ for (Map.Entry<Integer, FSWindowReplayWAL> entry :
readOnlyWals.entrySet()) {
- for (FileStatus status :
fs.listStatus(operatorDirStatus.getPath())) {
- String fileName = status.getPath().getName();
- if (fileName.endsWith(FSStorageAgent.TMP_FILE)) {
- continue;
- }
- long windowId = Long.parseLong(fileName, 16);
- replayState.put(windowId, operatorId);
- if (windowId > largestRecoveryWindow) {
- largestRecoveryWindow = windowId;
- }
+ long recoveryWindow = Stateless.WINDOW_ID;
+ if (!relyOnCheckpoints) {
+ long window = findLargestRecoveryWindow(entry.getValue(), null);
+ if (window > activationWindow) {
+ recoveryWindow = window;
}
+ } else {
+ recoveryWindow = findLargestRecoveryWindow(entry.getValue(),
activationWindow);
+ }
+
+ if (recoveryWindow < largestRecoveryWindow) {
+ largestRecoveryWindow = recoveryWindow;
}
}
- } catch (IOException e) {
- throw new RuntimeException(e);
}
+
+ //reset readers
+ wal.getReader().seek(wal.walStartPointer);
+ for (FSWindowReplayWAL wal : readOnlyWals.values()) {
+ wal.getReader().seek(wal.walStartPointer);
+ }
+
+ wal.setup();
+ for (FSWindowReplayWAL wal : readOnlyWals.values()) {
+ wal.setup();
+ }
+
}
- @Override
- public void save(Object object, int operatorId, long windowId) throws
IOException
+ protected void createReadOnlyWals() throws IOException
{
- storageAgent.save(object, operatorId, windowId);
+ RemoteIterator<FileStatus> operatorsIter = fileContext.listStatus(new
Path(statePath));
+ while (operatorsIter.hasNext()) {
+ FileStatus status = operatorsIter.next();
+ int operatorId = Integer.parseInt(status.getPath().getName());
+
+ if (operatorId != this.operatorId) {
+ //create read-only wal for other partitions
+ FSWindowReplayWAL wal = new FSWindowReplayWAL(true);
+ readOnlyWals.put(operatorId, wal);
+ }
+ }
}
- @Override
- public Object load(int operatorId, long windowId) throws IOException
+ private void configureWal(FSWindowReplayWAL wal, int operatorId, boolean
updateWalState) throws IOException
{
- Set<Integer> operators = replayState.get(windowId);
- if (operators == null || !operators.contains(operatorId)) {
- return null;
+ String operatorDir = statePath + Path.SEPARATOR + operatorId;
+ wal.setFilePath(operatorDir + Path.SEPARATOR + WAL_FILE_NAME);
+ wal.fileContext = fileContext;
+
+ if (updateWalState) {
+ if (!wal.fileDescriptors.isEmpty()) {
+ SortedSet<Integer> sortedParts = wal.fileDescriptors.keySet();
+
+ wal.walStartPointer = new
FileSystemWAL.FileSystemWALPointer(sortedParts.first(), 0);
+
+ FSWindowReplayWAL.FileDescriptor last =
wal.fileDescriptors.get(sortedParts.last()).last();
+ if (last.isTmp) {
+ wal.tempPartFiles.put(last.part, last.filePath.toString());
+ }
+ }
}
- return storageAgent.load(operatorId, windowId);
}
- @Override
- public void delete(int operatorId, long windowId) throws IOException
+ private void findFiles(FSWindowReplayWAL wal, int operatorId) throws
IOException
{
- storageAgent.delete(operatorId, windowId);
+ String operatorDir = statePath + Path.SEPARATOR + operatorId;
+ Path operatorPath = new Path(operatorDir);
+ if (fileContext.util().exists(operatorPath)) {
+ RemoteIterator<FileStatus> walFilesIter =
fileContext.listStatus(operatorPath);
+
+ while (walFilesIter.hasNext()) {
+ FileStatus fileStatus = walFilesIter.next();
+ FSWindowReplayWAL.FileDescriptor descriptor =
FSWindowReplayWAL.FileDescriptor.create(fileStatus.getPath());
+ wal.fileDescriptors.put(descriptor.part, descriptor);
+ }
+ }
}
- @Override
- public Map<Integer, Object> load(long windowId) throws IOException
+ private long findLargestRecoveryWindow(FSWindowReplayWAL wal, Long
ceilingWindow) throws IOException
{
- Set<Integer> operators = replayState.get(windowId);
- if (operators == null) {
- return null;
- }
- Map<Integer, Object> data = Maps.newHashMap();
- for (int operatorId : operators) {
- data.put(operatorId, load(operatorId, windowId));
+ if (!wal.fileDescriptors.isEmpty()) {
+ FileSystemWAL.FileSystemWALReader reader = wal.getReader();
+
+ //to find the largest window, we only need to look at the last file.
+ NavigableSet<Integer> descendingParts = new
TreeSet<>(wal.fileDescriptors.keySet()).descendingSet();
+ for (int part : descendingParts) {
+ FSWindowReplayWAL.FileDescriptor last =
wal.fileDescriptors.get(part).last();
+ reader.seek(new FileSystemWAL.FileSystemWALPointer(last.part, 0));
+
+ long endOffset = -1;
+
+ long lastWindow = Stateless.WINDOW_ID;
+ Slice slice = readNext(reader);
+
+ while (slice != null) {
+ boolean skipComplete = skipNext(reader);
+ if (!skipComplete) {
+ //artifact not saved so this window was not finished.
+ break;
+ }
+ Slice windowSlice = slice;
+ long offset = reader.getCurrentPointer().getOffset();
+ slice = readNext(reader); //either null, deleted or next window
+
+ if (slice == null || !slice.equals(DELETED)) {
+ //delete entry not found so window was not deleted
+ long window = Longs.fromByteArray(windowSlice.toByteArray());
+
+ if (ceilingWindow != null && window > ceilingWindow) {
+ break;
+ }
+ endOffset = offset;
+ lastWindow = window;
--- End diff --
I'm not sure if I'm understanding how delete works correctly. But should
the lastWindow be rolled back to the previous window when we encounter DELETED?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---