[
https://issues.apache.org/jira/browse/STORM-837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14696480#comment-14696480
]
ASF GitHub Bot commented on STORM-837:
--------------------------------------
Github user arunmahadevan commented on a diff in the pull request:
https://github.com/apache/storm/pull/644#discussion_r37049831
--- Diff:
external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
---
@@ -69,63 +78,92 @@
abstract void doPrepare(Map conf, int partitionIndex, int
numPartitions) throws IOException;
- protected void rotateOutputFile() throws IOException {
+ abstract long getCurrentOffset() throws IOException;
+
+ abstract void doCommit(Long txId) throws IOException;
+
+ abstract void doRecover(Path srcPath, long nBytes) throws
Exception;
+
+ protected void rotateOutputFile(boolean doRotateAction) throws
IOException {
LOG.info("Rotating output file...");
long start = System.currentTimeMillis();
- synchronized (this.writeLock) {
- closeOutputFile();
- this.rotation++;
-
- Path newFile = createOutputFile();
+ closeOutputFile();
+ this.rotation++;
+ Path newFile = createOutputFile();
+ if (doRotateAction) {
LOG.info("Performing {} file rotation actions.",
this.rotationActions.size());
for (RotationAction action : this.rotationActions) {
action.execute(this.fs, this.currentFile);
}
- this.currentFile = newFile;
}
+ this.currentFile = newFile;
long time = System.currentTimeMillis() - start;
LOG.info("File rotation took {} ms.", time);
+ }
-
+ protected void rotateOutputFile() throws IOException {
+ rotateOutputFile(true);
}
- void prepare(Map conf, int partitionIndex, int numPartitions){
- this.writeLock = new Object();
- if (this.rotationPolicy == null) throw new
IllegalStateException("RotationPolicy must be specified.");
+
+ void prepare(Map conf, int partitionIndex, int numPartitions) {
+ if (this.rotationPolicy == null) {
+ throw new IllegalStateException("RotationPolicy must be
specified.");
+ } else if (this.rotationPolicy instanceof
FileSizeRotationPolicy) {
+ long rotationBytes = ((FileSizeRotationPolicy)
rotationPolicy).getMaxBytes();
+ LOG.warn("FileSizeRotationPolicy specified with {}
bytes.", rotationBytes);
+ LOG.warn("Recovery will fail if data files cannot be
copied within topology.message.timeout.secs.");
+ LOG.warn("Ensure that the data files does not grow too big
with the FileSizeRotationPolicy.");
+ } else if (this.rotationPolicy instanceof TimedRotationPolicy)
{
+ LOG.warn("TimedRotationPolicy specified with interval {}
ms.", ((TimedRotationPolicy) rotationPolicy).getInterval());
+ LOG.warn("Recovery will fail if data files cannot be
copied within topology.message.timeout.secs.");
+ LOG.warn("Ensure that the data files does not grow too big
with the TimedRotationPolicy.");
+ }
if (this.fsUrl == null) {
throw new IllegalStateException("File system URL must be
specified.");
}
this.fileNameFormat.prepare(conf, partitionIndex,
numPartitions);
this.hdfsConfig = new Configuration();
- Map<String, Object> map = (Map<String,
Object>)conf.get(this.configKey);
- if(map != null){
- for(String key : map.keySet()){
+ Map<String, Object> map = (Map<String, Object>)
conf.get(this.configKey);
+ if (map != null) {
+ for (String key : map.keySet()) {
this.hdfsConfig.set(key, String.valueOf(map.get(key)));
}
}
- try{
+ try {
HdfsSecurityUtil.login(conf, hdfsConfig);
doPrepare(conf, partitionIndex, numPartitions);
this.currentFile = createOutputFile();
- } catch (Exception e){
+ } catch (Exception e) {
throw new RuntimeException("Error preparing HdfsState: " +
e.getMessage(), e);
}
- if(this.rotationPolicy instanceof TimedRotationPolicy){
- long interval =
((TimedRotationPolicy)this.rotationPolicy).getInterval();
- this.rotationTimer = new Timer(true);
- TimerTask task = new TimerTask() {
- @Override
- public void run() {
- try {
- rotateOutputFile();
- } catch(IOException e){
- LOG.warn("IOException during scheduled file
rotation.", e);
- }
- }
- };
- this.rotationTimer.scheduleAtFixedRate(task, interval,
interval);
+ if (this.rotationPolicy instanceof TimedRotationPolicy) {
+ ((TimedRotationPolicy) this.rotationPolicy).start();
+ }
--- End diff --
Actually the warnings are specific to HDFS state and by moving it to the
rotation policy we make the rotation policy aware of the recovery which might
not be ideal if the rotation policy class needs to be reused in some other
context.
Moving the start logic to policy specific prepare methods makes sense. I
will make this change.
> HdfsState ignores commits
> -------------------------
>
> Key: STORM-837
> URL: https://issues.apache.org/jira/browse/STORM-837
> Project: Apache Storm
> Issue Type: Bug
> Reporter: Robert Joseph Evans
> Assignee: Arun Mahadevan
> Priority: Critical
>
> HdfsState works with trident which is supposed to provide exactly once
> processing. It does this two ways, first by informing the state about
> commits so it can be sure the data is written out, and second by having a
> commit id, so that double commits can be handled.
> HdfsState ignores the beginCommit and commit calls, and with that ignores the
> ids. This means that if you use HdfsState and your worker crashes you may
> both lose data and get some data twice.
> At a minimum the flush and file rotation should be tied to the commit in some
> way. The commit ID should at a minimum be written out with the data so
> someone reading the data can have a hope of deduping it themselves.
> Also with the rotationActions it is possible for a file that was partially
> written is leaked, and never moved to the final location, because it is not
> rotated. I personally think the actions are too generic for this case and
> need to be deprecated.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)