Github user arunmahadevan commented on a diff in the pull request:

    https://github.com/apache/storm/pull/644#discussion_r37049831
  
    --- Diff: 
external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java 
---
    @@ -69,63 +78,92 @@
     
             abstract void doPrepare(Map conf, int partitionIndex, int 
numPartitions) throws IOException;
     
    -        protected void rotateOutputFile() throws IOException {
    +        abstract long getCurrentOffset() throws  IOException;
    +
    +        abstract void doCommit(Long txId) throws IOException;
    +
    +        abstract void doRecover(Path srcPath, long nBytes) throws 
Exception;
    +
    +        protected void rotateOutputFile(boolean doRotateAction) throws 
IOException {
                 LOG.info("Rotating output file...");
                 long start = System.currentTimeMillis();
    -            synchronized (this.writeLock) {
    -                closeOutputFile();
    -                this.rotation++;
    -
    -                Path newFile = createOutputFile();
    +            closeOutputFile();
    +            this.rotation++;
    +            Path newFile = createOutputFile();
    +            if (doRotateAction) {
                     LOG.info("Performing {} file rotation actions.", 
this.rotationActions.size());
                     for (RotationAction action : this.rotationActions) {
                         action.execute(this.fs, this.currentFile);
                     }
    -                this.currentFile = newFile;
                 }
    +            this.currentFile = newFile;
                 long time = System.currentTimeMillis() - start;
                 LOG.info("File rotation took {} ms.", time);
    +        }
     
    -
    +        protected void rotateOutputFile() throws IOException {
    +            rotateOutputFile(true);
             }
     
    -        void prepare(Map conf, int partitionIndex, int numPartitions){
    -            this.writeLock = new Object();
    -            if (this.rotationPolicy == null) throw new 
IllegalStateException("RotationPolicy must be specified.");
    +
    +        void prepare(Map conf, int partitionIndex, int numPartitions) {
    +            if (this.rotationPolicy == null) {
    +                throw new IllegalStateException("RotationPolicy must be 
specified.");
    +            } else if (this.rotationPolicy instanceof 
FileSizeRotationPolicy) {
    +                long rotationBytes = ((FileSizeRotationPolicy) 
rotationPolicy).getMaxBytes();
    +                LOG.warn("FileSizeRotationPolicy specified with {} 
bytes.", rotationBytes);
    +                LOG.warn("Recovery will fail if data files cannot be 
copied within topology.message.timeout.secs.");
    +                LOG.warn("Ensure that the data files does not grow too big 
with the FileSizeRotationPolicy.");
    +            } else if (this.rotationPolicy instanceof TimedRotationPolicy) 
{
    +                LOG.warn("TimedRotationPolicy specified with interval {} 
ms.", ((TimedRotationPolicy) rotationPolicy).getInterval());
    +                LOG.warn("Recovery will fail if data files cannot be 
copied within topology.message.timeout.secs.");
    +                LOG.warn("Ensure that the data files does not grow too big 
with the TimedRotationPolicy.");
    +            }
                 if (this.fsUrl == null) {
                     throw new IllegalStateException("File system URL must be 
specified.");
                 }
                 this.fileNameFormat.prepare(conf, partitionIndex, 
numPartitions);
                 this.hdfsConfig = new Configuration();
    -            Map<String, Object> map = (Map<String, 
Object>)conf.get(this.configKey);
    -            if(map != null){
    -                for(String key : map.keySet()){
    +            Map<String, Object> map = (Map<String, Object>) 
conf.get(this.configKey);
    +            if (map != null) {
    +                for (String key : map.keySet()) {
                         this.hdfsConfig.set(key, String.valueOf(map.get(key)));
                     }
                 }
    -            try{
    +            try {
                     HdfsSecurityUtil.login(conf, hdfsConfig);
                     doPrepare(conf, partitionIndex, numPartitions);
                     this.currentFile = createOutputFile();
     
    -            } catch (Exception e){
    +            } catch (Exception e) {
                     throw new RuntimeException("Error preparing HdfsState: " + 
e.getMessage(), e);
                 }
     
    -            if(this.rotationPolicy instanceof TimedRotationPolicy){
    -                long interval = 
((TimedRotationPolicy)this.rotationPolicy).getInterval();
    -                this.rotationTimer = new Timer(true);
    -                TimerTask task = new TimerTask() {
    -                    @Override
    -                    public void run() {
    -                        try {
    -                            rotateOutputFile();
    -                        } catch(IOException e){
    -                            LOG.warn("IOException during scheduled file 
rotation.", e);
    -                        }
    -                    }
    -                };
    -                this.rotationTimer.scheduleAtFixedRate(task, interval, 
interval);
    +            if (this.rotationPolicy instanceof TimedRotationPolicy) {
    +                ((TimedRotationPolicy) this.rotationPolicy).start();
    +            }
    --- End diff --
    
    Actually the warnings are specific to HDFS state and by moving it to the 
rotation policy we make the rotation policy aware of the recovery which might 
not be ideal if the rotation policy class needs to be reused in some other 
context.
    
    Moving the start logic to policy specific prepare methods makes sense. I 
will make this change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to