[
https://issues.apache.org/jira/browse/APEXMALHAR-1965?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15249325#comment-15249325
]
ASF GitHub Bot commented on APEXMALHAR-1965:
--------------------------------------------
Github user chandnisingh commented on a diff in the pull request:
https://github.com/apache/incubator-apex-malhar/pull/242#discussion_r60353719
--- Diff:
library/src/main/java/org/apache/apex/malhar/lib/wal/FileSystemWAL.java ---
@@ -0,0 +1,594 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.wal;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+
+import javax.validation.constraints.Min;
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.utils.FileContextUtils;
+import org.apache.apex.malhar.lib.utils.IOUtils;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.Syncable;
+import org.apache.hadoop.fs.local.LocalFs;
+import org.apache.hadoop.fs.local.RawLocalFs;
+
+import com.google.common.base.Preconditions;
+
+import com.datatorrent.api.annotation.Stateless;
+import com.datatorrent.netlet.util.Slice;
+
+public class FileSystemWAL implements
WAL<FileSystemWAL.FileSystemWALReader, FileSystemWAL.FileSystemWALWriter>
+{
+
+ @NotNull
+ private String filePath;
+
+ //max length of the file
+ @Min(0)
+ private long maxLength;
+
+ @NotNull
+ private FileSystemWAL.FileSystemWALReader fileSystemWALReader = new
FileSystemWALReader(this);
+
+ @NotNull
+ private FileSystemWAL.FileSystemWALWriter fileSystemWALWriter = new
FileSystemWALWriter(this);
+
+ //part => tmp file path;
+ private final ConcurrentSkipListMap<Integer, String> tempPartFiles = new
ConcurrentSkipListMap<>();
+
+ private long lastCheckpointedWindow = Stateless.WINDOW_ID;
+
+ @Override
+ public void setup()
+ {
+ try {
+ FileContext fileContext = FileContextUtils.getFileContext(filePath);
+ if (maxLength == 0) {
+ maxLength =
fileContext.getDefaultFileSystem().getServerDefaults().getBlockSize();
+ }
+ fileSystemWALWriter.open(fileContext);
+ fileSystemWALReader.open(fileContext);
+ } catch (IOException e) {
+ throw new RuntimeException("during setup", e);
+ }
+ }
+
+ @Override
+ public void beforeCheckpoint(long window)
+ {
+ try {
+ lastCheckpointedWindow = window;
+ fileSystemWALWriter.flush();
+ } catch (IOException e) {
+ throw new RuntimeException("during before cp", e);
+ }
+ }
+
+ @Override
+ public void committed(long window)
+ {
+ try {
+ fileSystemWALWriter.finalizeFiles(window);
+ } catch (IOException e) {
+ throw new RuntimeException("during committed", e);
+ }
+ }
+
+ @Override
+ public void teardown()
+ {
+ try {
+ fileSystemWALReader.close();
+ fileSystemWALWriter.close();
+ } catch (IOException e) {
+ throw new RuntimeException("during teardown", e);
+ }
+ }
+
+ protected long getLastCheckpointedWindow()
+ {
+ return lastCheckpointedWindow;
+ }
+
+ protected String getPartFilePath(int partNumber)
+ {
+ return filePath + "_" + partNumber;
+ }
+
+ @Override
+ public FileSystemWALReader getReader()
+ {
+ return fileSystemWALReader;
+ }
+
+ /**
+ * Sets the File System WAL Reader. This can be used to override the
default wal reader.
+ *
+ * @param fileSystemWALReader wal reader.
+ */
+ public void setFileSystemWALReader(@NotNull FileSystemWALReader
fileSystemWALReader)
+ {
+ this.fileSystemWALReader =
Preconditions.checkNotNull(fileSystemWALReader, "filesystem wal reader");
+ }
+
+ @Override
+ public FileSystemWALWriter getWriter()
+ {
+ return fileSystemWALWriter;
+ }
+
+ /**
+ * Sets the File System WAL Writer. This can be used to override the
default wal writer.
+ *
+ * @param fileSystemWALWriter wal writer.
+ */
+ public void setFileSystemWALWriter(@NotNull FileSystemWALWriter
fileSystemWALWriter)
+ {
+ this.fileSystemWALWriter =
Preconditions.checkNotNull(fileSystemWALWriter, "filesystem wal writer");
+ }
+
+ /**
+ * @return WAL file path
+ */
+ public String getFilePath()
+ {
+ return filePath;
+ }
+
+ /**
+ * Sets the WAL file path.
+ *
+ * @param filePath WAL file path
+ */
+ public void setFilePath(@NotNull String filePath)
+ {
+ this.filePath = Preconditions.checkNotNull(filePath, "filePath");
+ }
+
+ /**
+ * @return max length of a WAL part file.
+ */
+ public long getMaxLength()
+ {
+ return maxLength;
+ }
+
+ /**
+ * Sets the maximum length of a WAL part file.
+ *
+ * @param maxLength max length of the WAL part file
+ */
+ public void setMaxLength(long maxLength)
+ {
+ this.maxLength = maxLength;
+ }
+
+ public static class FileSystemWALPointer implements
Comparable<FileSystemWALPointer>
+ {
+ private final int partNum;
+ private long offset;
+
+ private FileSystemWALPointer()
+ {
+ //for kryo
+ partNum = -1;
+ }
+
+ public FileSystemWALPointer(long offset)
+ {
+ this(0, offset);
+ }
+
+ public FileSystemWALPointer(int partNum, long offset)
+ {
+ this.partNum = partNum;
+ this.offset = offset;
+ }
+
+ @Override
+ public int compareTo(@NotNull FileSystemWALPointer o)
+ {
+ if (this.partNum < o.partNum) {
+ return -1;
+ }
+ if (this.partNum > o.partNum) {
+ return 1;
+ }
+ if (this.offset < o.offset) {
+ return -1;
+ }
+ if (this.offset > o.offset) {
+ return 1;
+ }
+ return 0;
+ }
+
+ public int getPartNum()
+ {
+ return partNum;
+ }
+
+ public long getOffset()
+ {
+ return offset;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "FileSystemWalPointer{" + "partNum=" + partNum + ", offset="
+ offset + '}';
+ }
+ }
+
+ /**
+ * A FileSystem Wal Reader
+ */
+ public static class FileSystemWALReader implements
WAL.WALReader<FileSystemWALPointer>
+ {
+ private FileSystemWALPointer currentPointer = new
FileSystemWALPointer(0, 0);
+
+ private transient DataInputStream inputStream;
+ private transient Path currentOpenPath;
+
+ private final FileSystemWAL fileSystemWAL;
+ private transient FileContext fileContext;
+
+ private FileSystemWALReader()
+ {
+ //for kryo
+ fileSystemWAL = null;
+ }
+
+ public FileSystemWALReader(@NotNull FileSystemWAL fileSystemWal)
+ {
+ this.fileSystemWAL = Preconditions.checkNotNull(fileSystemWal,
"wal");
+ }
+
+ protected void open(@NotNull FileContext fileContext) throws
IOException
+ {
+ this.fileContext = Preconditions.checkNotNull(fileContext,
"fileContext");
+ }
+
+ protected void close() throws IOException
+ {
+ if (inputStream != null) {
+ inputStream.close();
+ inputStream = null;
+ }
+ }
+
+ @Override
+ public void seek(FileSystemWALPointer pointer) throws IOException
+ {
+ if (inputStream != null) {
+ close();
+ }
+ inputStream = getInputStream(pointer);
+ Preconditions.checkNotNull(inputStream, "invalid pointer " +
pointer);
+ currentPointer = pointer;
+ }
+
+ /**
+ * Move to the next WAL segment.
+ *
+ * @return true if the next part file exists and is opened; false
otherwise.
+ * @throws IOException
+ */
+ private boolean nextSegment() throws IOException
+ {
+ if (inputStream != null) {
+ close();
+ }
+
+ currentPointer = new FileSystemWALPointer(currentPointer.partNum +
1, 0);
+ inputStream = getInputStream(currentPointer);
+
+ return inputStream != null;
+ }
+
+ private DataInputStream getInputStream(FileSystemWALPointer
walPointer) throws IOException
+ {
+ Preconditions.checkArgument(inputStream == null, "input stream not
null");
+ Path pathToReadFrom;
+ String tmpPath =
fileSystemWAL.tempPartFiles.get(walPointer.getPartNum());
+ if (tmpPath != null) {
+ pathToReadFrom = new Path(tmpPath);
+ } else {
+ pathToReadFrom = new
Path(fileSystemWAL.getPartFilePath(walPointer.partNum));
+ }
+
+ LOG.debug("path to read {} and pointer {}", pathToReadFrom,
walPointer);
+ if (fileContext.util().exists(pathToReadFrom)) {
+ DataInputStream stream = fileContext.open(pathToReadFrom);
+ if (walPointer.offset > 0) {
+ stream.skip(walPointer.offset);
+ }
+ currentOpenPath = pathToReadFrom;
+ return stream;
+ }
+ return null;
+ }
+
+ @Override
+ public Slice next() throws IOException
+ {
+ do {
+ if (inputStream == null) {
+ inputStream = getInputStream(currentPointer);
+ }
+
+ if (inputStream != null &&
!fileContext.util().exists(currentOpenPath)) {
--- End diff --
Ok will do that. Was trying to minimize the overlap between reader and
writer.
> Create a WAL in Malhar
> ----------------------
>
> Key: APEXMALHAR-1965
> URL: https://issues.apache.org/jira/browse/APEXMALHAR-1965
> Project: Apache Apex Malhar
> Issue Type: Task
> Reporter: Chandni Singh
> Assignee: Tushar Gosavi
>
> In Malhar we have an IdempotentStorageManager which we use like a Write Ahead
> Logger. There have been some other places where we have created a different
> flavor of Write Ahead Logger.
> We need to find overlap between all these flavors and create a common Write
> Ahead Logger for use in Apex core and Apex malhar.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)