[
https://issues.apache.org/jira/browse/STORM-837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14696481#comment-14696481
]
ASF GitHub Bot commented on STORM-837:
--------------------------------------
Github user arunmahadevan commented on a diff in the pull request:
https://github.com/apache/storm/pull/644#discussion_r37049859
--- Diff:
external/storm-hdfs/src/main/java/org/apache/storm/hdfs/trident/HdfsState.java
---
@@ -288,47 +391,168 @@ void closeOutputFile() throws IOException {
@Override
public void execute(List<TridentTuple> tuples) throws IOException {
- long offset;
- for(TridentTuple tuple : tuples) {
- synchronized (this.writeLock) {
- this.writer.append(this.format.key(tuple),
this.format.value(tuple));
- offset = this.writer.getLength();
- }
-
- if (this.rotationPolicy.mark(tuple, offset)) {
- rotateOutputFile();
- this.rotationPolicy.reset();
- }
+ for (TridentTuple tuple : tuples) {
+ this.writer.append(this.format.key(tuple),
this.format.value(tuple));
}
}
}
+ /**
+ * TxnRecord [txnid, data_file_path, data_file_offset]
+ * <p>
+ * This is written to the index file during beginCommit() and used for
recovery.
+ * </p>
+ */
+ private static class TxnRecord {
+ private long txnid;
+ private String dataFilePath;
+ private long offset;
+
+ private TxnRecord(long txnId, String dataFilePath, long offset) {
+ this.txnid = txnId;
+ this.dataFilePath = dataFilePath;
+ this.offset = offset;
+ }
+
+ @Override
+ public String toString() {
+ return Long.toString(txnid) + "," + dataFilePath + "," +
Long.toString(offset);
+ }
+ }
+
+
public static final Logger LOG =
LoggerFactory.getLogger(HdfsState.class);
private Options options;
+ private volatile TxnRecord lastSeenTxn;
+ private Path indexFilePath;
- HdfsState(Options options){
+ HdfsState(Options options) {
this.options = options;
}
- void prepare(Map conf, IMetricsContext metrics, int partitionIndex,
int numPartitions){
+ void prepare(Map conf, IMetricsContext metrics, int partitionIndex,
int numPartitions) {
this.options.prepare(conf, partitionIndex, numPartitions);
+ initLastTxn(conf, partitionIndex);
+ }
+
+ private TxnRecord readTxnRecord(Path path) throws IOException {
+ FSDataInputStream inputStream = null;
+ try {
+ inputStream = this.options.fs.open(path);
+ BufferedReader reader = new BufferedReader(new
InputStreamReader(inputStream));
+ String line;
+ if ((line = reader.readLine()) != null) {
+ String[] fields = line.split(",");
+ return new TxnRecord(Long.valueOf(fields[0]), fields[1],
Long.valueOf(fields[2]));
+ }
+ } finally {
+ if (inputStream != null) {
+ inputStream.close();
+ }
+ }
+ return new TxnRecord(0, options.currentFile.toString(), 0);
+ }
+
+ /**
+ * Reads the last txn record from index file if it exists, if not
+ * from .tmp file if exists.
+ *
+ * @param indexFilePath the index file path
+ * @return the txn record from the index file or a default initial
record.
+ * @throws IOException
+ */
+ private TxnRecord getTxnRecord(Path indexFilePath) throws IOException {
+ Path tmpPath = new Path(indexFilePath.toString() + ".tmp");
--- End diff --
will do this.
> HdfsState ignores commits
> -------------------------
>
> Key: STORM-837
> URL: https://issues.apache.org/jira/browse/STORM-837
> Project: Apache Storm
> Issue Type: Bug
> Reporter: Robert Joseph Evans
> Assignee: Arun Mahadevan
> Priority: Critical
>
> HdfsState works with trident which is supposed to provide exactly once
> processing. It does this two ways, first by informing the state about
> commits so it can be sure the data is written out, and second by having a
> commit id, so that double commits can be handled.
> HdfsState ignores the beginCommit and commit calls, and with that ignores the
> ids. This means that if you use HdfsState and your worker crashes you may
> both lose data and get some data twice.
> At a minimum the flush and file rotation should be tied to the commit in some
> way. The commit ID should at a minimum be written out with the data so
> someone reading the data can have a hope of deduping it themselves.
> Also with the rotationActions it is possible for a file that was partially
> written is leaked, and never moved to the final location, because it is not
> rotated. I personally think the actions are too generic for this case and
> need to be deprecated.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)