[ 
https://issues.apache.org/jira/browse/STORM-837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14696290#comment-14696290
 ] 

ASF GitHub Bot commented on STORM-837:
--------------------------------------

Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/644#discussion_r37043067
  
    --- Diff: 
external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java 
---
    @@ -288,47 +391,168 @@ void closeOutputFile() throws IOException {
     
             @Override
             public void execute(List<TridentTuple> tuples) throws IOException {
    -            long offset;
    -            for(TridentTuple tuple : tuples) {
    -                synchronized (this.writeLock) {
    -                    this.writer.append(this.format.key(tuple), 
this.format.value(tuple));
    -                    offset = this.writer.getLength();
    -                }
    -
    -                if (this.rotationPolicy.mark(tuple, offset)) {
    -                    rotateOutputFile();
    -                    this.rotationPolicy.reset();
    -                }
    +            for (TridentTuple tuple : tuples) {
    +                this.writer.append(this.format.key(tuple), 
this.format.value(tuple));
                 }
             }
     
         }
     
    +    /**
    +     * TxnRecord [txnid, data_file_path, data_file_offset]
    +     * <p>
    +     * This is written to the index file during beginCommit() and used for 
recovery.
    +     * </p>
    +     */
    +    private static class TxnRecord {
    +        private long txnid;
    +        private String dataFilePath;
    +        private long offset;
    +
    +        private TxnRecord(long txnId, String dataFilePath, long offset) {
    +            this.txnid = txnId;
    +            this.dataFilePath = dataFilePath;
    +            this.offset = offset;
    +        }
    +
    +        @Override
    +        public String toString() {
    +            return Long.toString(txnid) + "," + dataFilePath + "," + 
Long.toString(offset);
    +        }
    +    }
    +
    +
         public static final Logger LOG = 
LoggerFactory.getLogger(HdfsState.class);
         private Options options;
    +    private volatile TxnRecord lastSeenTxn;
    +    private Path indexFilePath;
     
    -    HdfsState(Options options){
    +    HdfsState(Options options) {
             this.options = options;
         }
     
    -    void prepare(Map conf, IMetricsContext metrics, int partitionIndex, 
int numPartitions){
    +    void prepare(Map conf, IMetricsContext metrics, int partitionIndex, 
int numPartitions) {
             this.options.prepare(conf, partitionIndex, numPartitions);
    +        initLastTxn(conf, partitionIndex);
    +    }
    +
    +    private TxnRecord readTxnRecord(Path path) throws IOException {
    +        FSDataInputStream inputStream = null;
    +        try {
    +            inputStream = this.options.fs.open(path);
    +            BufferedReader reader = new BufferedReader(new 
InputStreamReader(inputStream));
    +            String line;
    +            if ((line = reader.readLine()) != null) {
    +                String[] fields = line.split(",");
    +                return new TxnRecord(Long.valueOf(fields[0]), fields[1], 
Long.valueOf(fields[2]));
    +            }
    +        } finally {
    +            if (inputStream != null) {
    +                inputStream.close();
    +            }
    +        }
    +        return new TxnRecord(0, options.currentFile.toString(), 0);
    +    }
    +
    +    /**
    +     * Reads the last txn record from index file if it exists, if not
    +     * from .tmp file if exists.
    +     *
    +     * @param indexFilePath the index file path
    +     * @return the txn record from the index file or a default initial 
record.
    +     * @throws IOException
    +     */
    +    private TxnRecord getTxnRecord(Path indexFilePath) throws IOException {
    +        Path tmpPath = new Path(indexFilePath.toString() + ".tmp");
    --- End diff --
    
    It would be good to have this pulled into its own method, just so that we 
do not have ".tmp" hard-coded in two spots.


> HdfsState ignores commits
> -------------------------
>
>                 Key: STORM-837
>                 URL: https://issues.apache.org/jira/browse/STORM-837
>             Project: Apache Storm
>          Issue Type: Bug
>            Reporter: Robert Joseph Evans
>            Assignee: Arun Mahadevan
>            Priority: Critical
>
> HdfsState works with trident which is supposed to provide exactly once 
> processing.  It does this two ways, first by informing the state about 
> commits so it can be sure the data is written out, and second by having a 
> commit id, so that double commits can be handled.
> HdfsState ignores the beginCommit and commit calls, and with that ignores the 
> ids.  This means that if you use HdfsState and your worker crashes you may 
> both lose data and get some data twice.
> At a minimum the flush and file rotation should be tied to the commit in some 
> way.  The commit ID should at a minimum be written out with the data so 
> someone reading the data can have a hope of deduping it themselves.
> Also with the rotationActions it is possible for a file that was partially 
> written is leaked, and never moved to the final location, because it is not 
> rotated.  I personally think the actions are too generic for this case and 
> need to be deprecated.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to