Github user arunmahadevan commented on a diff in the pull request:
https://github.com/apache/storm/pull/644#discussion_r37049831
--- Diff:
external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
---
@@ -69,63 +78,92 @@
abstract void doPrepare(Map conf, int partitionIndex, int
numPartitions) throws IOException;
- protected void rotateOutputFile() throws IOException {
+ abstract long getCurrentOffset() throws IOException;
+
+ abstract void doCommit(Long txId) throws IOException;
+
+ abstract void doRecover(Path srcPath, long nBytes) throws
Exception;
+
+ protected void rotateOutputFile(boolean doRotateAction) throws
IOException {
LOG.info("Rotating output file...");
long start = System.currentTimeMillis();
- synchronized (this.writeLock) {
- closeOutputFile();
- this.rotation++;
-
- Path newFile = createOutputFile();
+ closeOutputFile();
+ this.rotation++;
+ Path newFile = createOutputFile();
+ if (doRotateAction) {
LOG.info("Performing {} file rotation actions.",
this.rotationActions.size());
for (RotationAction action : this.rotationActions) {
action.execute(this.fs, this.currentFile);
}
- this.currentFile = newFile;
}
+ this.currentFile = newFile;
long time = System.currentTimeMillis() - start;
LOG.info("File rotation took {} ms.", time);
+ }
-
+ protected void rotateOutputFile() throws IOException {
+ rotateOutputFile(true);
}
- void prepare(Map conf, int partitionIndex, int numPartitions){
- this.writeLock = new Object();
- if (this.rotationPolicy == null) throw new
IllegalStateException("RotationPolicy must be specified.");
+
+ void prepare(Map conf, int partitionIndex, int numPartitions) {
+ if (this.rotationPolicy == null) {
+ throw new IllegalStateException("RotationPolicy must be
specified.");
+ } else if (this.rotationPolicy instanceof
FileSizeRotationPolicy) {
+ long rotationBytes = ((FileSizeRotationPolicy)
rotationPolicy).getMaxBytes();
+ LOG.warn("FileSizeRotationPolicy specified with {}
bytes.", rotationBytes);
+ LOG.warn("Recovery will fail if data files cannot be
copied within topology.message.timeout.secs.");
+ LOG.warn("Ensure that the data files does not grow too big
with the FileSizeRotationPolicy.");
+ } else if (this.rotationPolicy instanceof TimedRotationPolicy)
{
+ LOG.warn("TimedRotationPolicy specified with interval {}
ms.", ((TimedRotationPolicy) rotationPolicy).getInterval());
+ LOG.warn("Recovery will fail if data files cannot be
copied within topology.message.timeout.secs.");
+ LOG.warn("Ensure that the data files does not grow too big
with the TimedRotationPolicy.");
+ }
if (this.fsUrl == null) {
throw new IllegalStateException("File system URL must be
specified.");
}
this.fileNameFormat.prepare(conf, partitionIndex,
numPartitions);
this.hdfsConfig = new Configuration();
- Map<String, Object> map = (Map<String,
Object>)conf.get(this.configKey);
- if(map != null){
- for(String key : map.keySet()){
+ Map<String, Object> map = (Map<String, Object>)
conf.get(this.configKey);
+ if (map != null) {
+ for (String key : map.keySet()) {
this.hdfsConfig.set(key, String.valueOf(map.get(key)));
}
}
- try{
+ try {
HdfsSecurityUtil.login(conf, hdfsConfig);
doPrepare(conf, partitionIndex, numPartitions);
this.currentFile = createOutputFile();
- } catch (Exception e){
+ } catch (Exception e) {
throw new RuntimeException("Error preparing HdfsState: " +
e.getMessage(), e);
}
- if(this.rotationPolicy instanceof TimedRotationPolicy){
- long interval =
((TimedRotationPolicy)this.rotationPolicy).getInterval();
- this.rotationTimer = new Timer(true);
- TimerTask task = new TimerTask() {
- @Override
- public void run() {
- try {
- rotateOutputFile();
- } catch(IOException e){
- LOG.warn("IOException during scheduled file
rotation.", e);
- }
- }
- };
- this.rotationTimer.scheduleAtFixedRate(task, interval,
interval);
+ if (this.rotationPolicy instanceof TimedRotationPolicy) {
+ ((TimedRotationPolicy) this.rotationPolicy).start();
+ }
--- End diff --
Actually the warnings are specific to HDFS state and by moving it to the
rotation policy we make the rotation policy aware of the recovery which might
not be ideal if the rotation policy class needs to be reused in some other
context.
Moving the start logic to policy specific prepare methods makes sense. I
will make this change.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---