[ 
https://issues.apache.org/jira/browse/STORM-837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14695944#comment-14695944
 ] 

ASF GitHub Bot commented on STORM-837:
--------------------------------------

Github user d2r commented on a diff in the pull request:

    https://github.com/apache/storm/pull/644#discussion_r37025284
  
    --- Diff: 
external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java 
---
    @@ -69,63 +78,92 @@
     
             abstract void doPrepare(Map conf, int partitionIndex, int 
numPartitions) throws IOException;
     
    -        protected void rotateOutputFile() throws IOException {
    +        abstract long getCurrentOffset() throws  IOException;
    +
    +        abstract void doCommit(Long txId) throws IOException;
    +
    +        abstract void doRecover(Path srcPath, long nBytes) throws 
Exception;
    +
    +        protected void rotateOutputFile(boolean doRotateAction) throws 
IOException {
                 LOG.info("Rotating output file...");
                 long start = System.currentTimeMillis();
    -            synchronized (this.writeLock) {
    -                closeOutputFile();
    -                this.rotation++;
    -
    -                Path newFile = createOutputFile();
    +            closeOutputFile();
    +            this.rotation++;
    +            Path newFile = createOutputFile();
    +            if (doRotateAction) {
                     LOG.info("Performing {} file rotation actions.", 
this.rotationActions.size());
                     for (RotationAction action : this.rotationActions) {
                         action.execute(this.fs, this.currentFile);
                     }
    -                this.currentFile = newFile;
                 }
    +            this.currentFile = newFile;
                 long time = System.currentTimeMillis() - start;
                 LOG.info("File rotation took {} ms.", time);
    +        }
     
    -
    +        protected void rotateOutputFile() throws IOException {
    +            rotateOutputFile(true);
             }
     
    -        void prepare(Map conf, int partitionIndex, int numPartitions){
    -            this.writeLock = new Object();
    -            if (this.rotationPolicy == null) throw new 
IllegalStateException("RotationPolicy must be specified.");
    +
    +        void prepare(Map conf, int partitionIndex, int numPartitions) {
    +            if (this.rotationPolicy == null) {
    +                throw new IllegalStateException("RotationPolicy must be 
specified.");
    +            } else if (this.rotationPolicy instanceof 
FileSizeRotationPolicy) {
    +                long rotationBytes = ((FileSizeRotationPolicy) 
rotationPolicy).getMaxBytes();
    +                LOG.warn("FileSizeRotationPolicy specified with {} 
bytes.", rotationBytes);
    +                LOG.warn("Recovery will fail if data files cannot be 
copied within topology.message.timeout.secs.");
    +                LOG.warn("Ensure that the data files does not grow too big 
with the FileSizeRotationPolicy.");
    +            } else if (this.rotationPolicy instanceof TimedRotationPolicy) 
{
    +                LOG.warn("TimedRotationPolicy specified with interval {} 
ms.", ((TimedRotationPolicy) rotationPolicy).getInterval());
    +                LOG.warn("Recovery will fail if data files cannot be 
copied within topology.message.timeout.secs.");
    +                LOG.warn("Ensure that the data files does not grow too big 
with the TimedRotationPolicy.");
    +            }
                 if (this.fsUrl == null) {
                     throw new IllegalStateException("File system URL must be 
specified.");
                 }
                 this.fileNameFormat.prepare(conf, partitionIndex, 
numPartitions);
                 this.hdfsConfig = new Configuration();
    -            Map<String, Object> map = (Map<String, 
Object>)conf.get(this.configKey);
    -            if(map != null){
    -                for(String key : map.keySet()){
    +            Map<String, Object> map = (Map<String, Object>) 
conf.get(this.configKey);
    +            if (map != null) {
    +                for (String key : map.keySet()) {
                         this.hdfsConfig.set(key, String.valueOf(map.get(key)));
                     }
                 }
    -            try{
    +            try {
                     HdfsSecurityUtil.login(conf, hdfsConfig);
                     doPrepare(conf, partitionIndex, numPartitions);
                     this.currentFile = createOutputFile();
     
    -            } catch (Exception e){
    +            } catch (Exception e) {
                     throw new RuntimeException("Error preparing HdfsState: " + 
e.getMessage(), e);
                 }
     
    -            if(this.rotationPolicy instanceof TimedRotationPolicy){
    -                long interval = 
((TimedRotationPolicy)this.rotationPolicy).getInterval();
    -                this.rotationTimer = new Timer(true);
    -                TimerTask task = new TimerTask() {
    -                    @Override
    -                    public void run() {
    -                        try {
    -                            rotateOutputFile();
    -                        } catch(IOException e){
    -                            LOG.warn("IOException during scheduled file 
rotation.", e);
    -                        }
    -                    }
    -                };
    -                this.rotationTimer.scheduleAtFixedRate(task, interval, 
interval);
    +            if (this.rotationPolicy instanceof TimedRotationPolicy) {
    +                ((TimedRotationPolicy) this.rotationPolicy).start();
    +            }
    --- End diff --
    
    Referencing the [other comment 
above](https://github.com/apache/storm/pull/644/files#r37024551), 
TimedRotationPolicy#prepare could look like:
    ```Java
    LOG.warn("TimedRotationPolicy specified with interval {} ms.", 
((TimedRotationPolicy) rotationPolicy).getInterval());
    LOG.warn("Recovery will fail if data files cannot be copied within 
topology.message.timeout.secs.");
    LOG.warn("Ensure that the data files does not grow too big with the 
TimedRotationPolicy.");
    
    super.prepare();
    
    this.rotationPolicy.start();
    ```



> HdfsState ignores commits
> -------------------------
>
>                 Key: STORM-837
>                 URL: https://issues.apache.org/jira/browse/STORM-837
>             Project: Apache Storm
>          Issue Type: Bug
>            Reporter: Robert Joseph Evans
>            Assignee: Arun Mahadevan
>            Priority: Critical
>
> HdfsState works with trident which is supposed to provide exactly once 
> processing.  It does this two ways, first by informing the state about 
> commits so it can be sure the data is written out, and second by having a 
> commit id, so that double commits can be handled.
> HdfsState ignores the beginCommit and commit calls, and with that ignores the 
> ids.  This means that if you use HdfsState and your worker crashes you may 
> both lose data and get some data twice.
> At a minimum the flush and file rotation should be tied to the commit in some 
> way.  The commit ID should at a minimum be written out with the data so 
> someone reading the data can have a hope of deduping it themselves.
> Also with the rotationActions it is possible for a file that was partially 
> written is leaked, and never moved to the final location, because it is not 
> rotated.  I personally think the actions are too generic for this case and 
> need to be deprecated.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to