[
https://issues.apache.org/jira/browse/STORM-1464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15192887#comment-15192887
]
ASF GitHub Bot commented on STORM-1464:
---------------------------------------
Github user arunmahadevan commented on a diff in the pull request:
https://github.com/apache/storm/pull/1044#discussion_r55965923
--- Diff:
external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
---
@@ -223,29 +259,57 @@ public final void execute(Tuple tuple) {
public void declareOutputFields(OutputFieldsDeclarer
outputFieldsDeclarer) {
}
- /**
- * writes a tuple to the underlying filesystem but makes no guarantees
about syncing data.
- *
- * this.offset is also updated to reflect additional data written
- *
- * @param tuple
- * @throws IOException
- */
- abstract void writeTuple(Tuple tuple) throws IOException;
+ private void syncAllWriters() throws IOException {
+ for (AbstractHDFSWriter writer : writers.values()) {
+ writer.sync();
+ }
+ }
- /**
- * Make the best effort to sync written data to the underlying file
system. Concrete classes should very clearly
- * state the file state that sync guarantees. For example, HdfsBolt
can make a much stronger guarantee than
- * SequenceFileBolt.
- *
- * @throws IOException
- */
- abstract void syncTuples() throws IOException;
+ private String getKeyToOldestWriter()
+ {
+ String oldestKey = null;
+ long oldestTime = Long.MAX_VALUE;
+ for (final Map.Entry<String, AbstractHDFSWriter> entry :
writers.entrySet()) {
+ if (entry.getValue().getLastUsedTime() < oldestTime) {
+ oldestKey = entry.getKey();
+ oldestTime = entry.getValue().getLastUsedTime();
+ }
+ }
- abstract void closeOutputFile() throws IOException;
+ return oldestKey;
+ }
- abstract Path createOutputFile() throws IOException;
+ private void startTimedRotationPolicy() {
+ long interval =
((TimedRotationPolicy)this.rotationPolicy).getInterval();
+ this.rotationTimer = new Timer(true);
+ TimerTask task = new TimerTask() {
+ @Override
+ public void run() {
+ for (final AbstractHDFSWriter writer : writers.values()) {
+ try {
+ rotateOutputFile(writer);
+ } catch (IOException e) {
+ LOG.warn("IOException during scheduled file
rotation.", e);
+ }
+ }
+ writers.clear();
+ }
+ };
+ this.rotationTimer.scheduleAtFixedRate(task, interval, interval);
+ }
+
+ protected Path getBasePathForNextFile(Tuple tuple) {
+
+ Path fullPathToFile = new Path(this.fsUrl +
this.fileNameFormat.getPath() + this.partitioner.getPartitionPath(tuple),
+ this.fileNameFormat.getName(this.rotation,
System.currentTimeMillis()));
--- End diff --
Since `this.rotation` is not per writer, we could end up having
non-contiguous files names (like file-1, file-5, file-8 etc) inside a partition
path, correct ? If so it should be fixed.
> storm-hdfs should support writing to multiple files
> ---------------------------------------------------
>
> Key: STORM-1464
> URL: https://issues.apache.org/jira/browse/STORM-1464
> Project: Apache Storm
> Issue Type: Improvement
> Components: storm-hdfs
> Reporter: Aaron Dossett
> Assignee: Aaron Dossett
> Labels: avro
>
> Examples of when this is needed include:
> - One avro bolt writing multiple schemas, each of which require a different
> file. Schema evolution is a common use of avro and the avro bolt should
> support that seamlessly.
> - Partitioning output to different directories based on the tuple contents.
> For example, if the tuple contains a "USER" field, it should be possible to
> partition based on that value.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)