Repository: storm Updated Branches: refs/heads/master 4284b09c7 -> f48d7941b
STORM-1464: storm-hdfs support for multiple output files and partitioning Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/7554fe20 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/7554fe20 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/7554fe20 Branch: refs/heads/master Commit: 7554fe2042369141d374b149de40949557ee70ea Parents: 500ef20 Author: Aaron Dossett <[email protected]> Authored: Mon Mar 21 13:06:44 2016 -0500 Committer: Aaron Dossett <[email protected]> Committed: Mon Mar 21 13:06:44 2016 -0500 ---------------------------------------------------------------------- external/storm-hdfs/README.md | 29 ++- .../storm/hdfs/bolt/AbstractHdfsBolt.java | 182 +++++++++++++------ .../storm/hdfs/bolt/AvroGenericRecordBolt.java | 75 +++----- .../org/apache/storm/hdfs/bolt/HdfsBolt.java | 39 ++-- .../storm/hdfs/bolt/SequenceFileBolt.java | 35 ++-- .../hdfs/bolt/rotation/FileRotationPolicy.java | 5 + .../bolt/rotation/FileSizeRotationPolicy.java | 8 + .../hdfs/bolt/rotation/NoRotationPolicy.java | 5 + .../hdfs/bolt/rotation/TimedRotationPolicy.java | 8 + .../storm/hdfs/common/AbstractHDFSWriter.java | 68 +++++++ .../common/AvroGenericRecordHDFSWriter.java | 80 ++++++++ .../apache/storm/hdfs/common/HDFSWriter.java | 66 +++++++ .../storm/hdfs/common/NullPartitioner.java | 31 ++++ .../apache/storm/hdfs/common/Partitioner.java | 36 ++++ .../storm/hdfs/common/SequenceFileWriter.java | 59 ++++++ .../hdfs/bolt/AvroGenericRecordBoltTest.java | 105 ++++++++--- .../apache/storm/hdfs/bolt/TestHdfsBolt.java | 34 +++- .../storm/hdfs/bolt/TestSequenceFileBolt.java | 4 +- .../apache/storm/hdfs/bolt/TestWritersMap.java | 48 +++++ 19 files changed, 730 insertions(+), 187 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/7554fe20/external/storm-hdfs/README.md ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/README.md b/external/storm-hdfs/README.md index 2fc4c7b..3777481 100644 --- a/external/storm-hdfs/README.md +++ b/external/storm-hdfs/README.md @@ -184,6 +184,7 @@ Similar to sync policies, file rotation policies allow you to control when data public interface FileRotationPolicy extends Serializable { boolean mark(Tuple tuple, long offset); void reset(); + FileRotationPolicy copy(); } ``` @@ -240,6 +241,23 @@ If you are using Trident and sequence files you can do something like this: .addRotationAction(new MoveFileAction().withDestination("/dest2/")); ``` +### Data Partitioning +Data can be partitioned to different HDFS directories based on characteristics of the tuple being processed or purely +external factors, such as system time. To partition your your data, write a class that implements the ```Partitioner``` +interface and pass it to the withPartitioner() method of your bolt. The getPartitionPath() method returns a partition +path for a given tuple. + +Here's an example of a Partitioner that operates on a specific field of data: + +```java + + Partitioner partitoner = new Partitioner() { + @Override + public String getPartitionPath(Tuple tuple) { + return Path.SEPARATOR + tuple.getStringByField("city"); + } + }; +``` ## HDFS Bolt Support for HDFS Sequence Files @@ -303,16 +321,15 @@ The `org.apache.storm.hdfs.bolt.AvroGenericRecordBolt` class allows you to write AvroGenericRecordBolt bolt = new AvroGenericRecordBolt() .withFsUrl("hdfs://localhost:54310") .withFileNameFormat(fileNameFormat) - .withSchemaAsString(schema) .withRotationPolicy(rotationPolicy) .withSyncPolicy(syncPolicy); ``` -The setup is very similar to the `SequenceFileBolt` example above. The key difference is that instead of specifying a -`SequenceFormat` you must provide a string representation of an Avro schema through the `withSchemaAsString()` method. -An `org.apache.avro.Schema` object cannot be directly provided since it does not implement `Serializable`. -The AvroGenericRecordBolt expects to receive tuples containing an Avro GenericRecord that conforms to the provided -schema. +The avro bolt will write records to separate files based on the schema of the record being processed. In other words, +if the bolt receives records with two different schemas, it will write to two separate files. Each file will be rotatated +in accordance with the specified rotation policy. If a large number of Avro schemas are expected, then the bolt should +be configured with a maximum number of open files at least equal to the number of schemas expected to prevent excessive +file open/close/create operations. To use this bolt you **must** register the appropriate Kryo serializers with your topology configuration. A convenience method is provided for this: http://git-wip-us.apache.org/repos/asf/storm/blob/7554fe20/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java index c56f486..5daa0fa 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java @@ -17,14 +17,12 @@ */ package org.apache.storm.hdfs.bolt; -import org.apache.storm.Config; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichBolt; import org.apache.storm.tuple.Tuple; import org.apache.storm.utils.TupleUtils; -import org.apache.storm.utils.Utils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -32,6 +30,9 @@ import org.apache.storm.hdfs.bolt.format.FileNameFormat; import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy; import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy; import org.apache.storm.hdfs.bolt.sync.SyncPolicy; +import org.apache.storm.hdfs.common.AbstractHDFSWriter; +import org.apache.storm.hdfs.common.NullPartitioner; +import org.apache.storm.hdfs.common.Partitioner; import org.apache.storm.hdfs.common.rotation.RotationAction; import org.apache.storm.hdfs.common.security.HdfsSecurityUtil; import org.slf4j.Logger; @@ -39,6 +40,8 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.Map; import java.util.List; @@ -52,15 +55,16 @@ public abstract class AbstractHdfsBolt extends BaseRichBolt { * Half of the default Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS */ private static final int DEFAULT_TICK_TUPLE_INTERVAL_SECS = 15; + private static final Integer DEFAULT_MAX_OPEN_FILES = 50; - protected ArrayList<RotationAction> rotationActions = new ArrayList<RotationAction>(); - private Path currentFile; + protected Map<String, AbstractHDFSWriter> writers; + protected Map<String, Integer> rotationCounterMap = new HashMap<>(); + protected List<RotationAction> rotationActions = new ArrayList<>(); protected OutputCollector collector; protected transient FileSystem fs; protected SyncPolicy syncPolicy; protected FileRotationPolicy rotationPolicy; protected FileNameFormat fileNameFormat; - protected int rotation = 0; protected String fsUrl; protected String configKey; protected transient Object writeLock; @@ -69,22 +73,21 @@ public abstract class AbstractHdfsBolt extends BaseRichBolt { protected long offset = 0; protected Integer fileRetryCount = DEFAULT_RETRY_COUNT; protected Integer tickTupleInterval = DEFAULT_TICK_TUPLE_INTERVAL_SECS; + protected Integer maxOpenFiles = DEFAULT_MAX_OPEN_FILES; + protected Partitioner partitioner = new NullPartitioner(); protected transient Configuration hdfsConfig; - protected void rotateOutputFile() throws IOException { + protected void rotateOutputFile(AbstractHDFSWriter writer) throws IOException { LOG.info("Rotating output file..."); long start = System.currentTimeMillis(); synchronized (this.writeLock) { - closeOutputFile(); - this.rotation++; + writer.close(); - Path newFile = createOutputFile(); LOG.info("Performing {} file rotation actions.", this.rotationActions.size()); for (RotationAction action : this.rotationActions) { - action.execute(this.fs, this.currentFile); + action.execute(this.fs, writer.getFilePath()); } - this.currentFile = newFile; } long time = System.currentTimeMillis() - start; LOG.info("File rotation took {} ms.", time); @@ -104,6 +107,8 @@ public abstract class AbstractHdfsBolt extends BaseRichBolt { throw new IllegalStateException("File system URL must be specified."); } + writers = new WritersMap(this.maxOpenFiles); + this.collector = collector; this.fileNameFormat.prepare(conf, topologyContext); this.hdfsConfig = new Configuration(); @@ -117,26 +122,12 @@ public abstract class AbstractHdfsBolt extends BaseRichBolt { try{ HdfsSecurityUtil.login(conf, hdfsConfig); doPrepare(conf, topologyContext, collector); - this.currentFile = createOutputFile(); - } catch (Exception e){ throw new RuntimeException("Error preparing HdfsBolt: " + e.getMessage(), e); } if(this.rotationPolicy instanceof TimedRotationPolicy){ - long interval = ((TimedRotationPolicy)this.rotationPolicy).getInterval(); - this.rotationTimer = new Timer(true); - TimerTask task = new TimerTask() { - @Override - public void run() { - try { - rotateOutputFile(); - } catch(IOException e){ - LOG.warn("IOException during scheduled file rotation.", e); - } - } - }; - this.rotationTimer.scheduleAtFixedRate(task, interval, interval); + startTimedRotationPolicy(); } } @@ -145,13 +136,20 @@ public abstract class AbstractHdfsBolt extends BaseRichBolt { synchronized (this.writeLock) { boolean forceSync = false; + AbstractHDFSWriter writer = null; + String writerKey = null; + if (TupleUtils.isTick(tuple)) { LOG.debug("TICK! forcing a file system flush"); this.collector.ack(tuple); forceSync = true; } else { + + writerKey = getHashKeyForTuple(tuple); + try { - writeTuple(tuple); + writer = getOrCreateWriter(writerKey, tuple); + this.offset = writer.write(tuple); tupleBatch.add(tuple); } catch (IOException e) { //If the write failed, try to sync anything already written @@ -171,7 +169,7 @@ public abstract class AbstractHdfsBolt extends BaseRichBolt { while (success == false && attempts < fileRetryCount) { attempts += 1; try { - syncTuples(); + syncAllWriters(); LOG.debug("Data synced to filesystem. Ack'ing [{}] tuples", tupleBatch.size()); for (Tuple t : tupleBatch) { this.collector.ack(t); @@ -198,22 +196,53 @@ public abstract class AbstractHdfsBolt extends BaseRichBolt { } } - if(this.rotationPolicy.mark(tuple, this.offset)) { - try { - rotateOutputFile(); - this.rotationPolicy.reset(); - this.offset = 0; - } catch (IOException e) { - this.collector.reportError(e); - LOG.warn("File could not be rotated"); - //At this point there is nothing to do. In all likelihood any filesystem operations will fail. - //The next tuple will almost certainly fail to write and/or sync, which force a rotation. That - //will give rotateAndReset() a chance to work which includes creating a fresh file handle. - } + if (writer != null && writer.needsRotation()) { + doRotationAndRemoveWriter(writerKey, writer); } } } + private AbstractHDFSWriter getOrCreateWriter(String writerKey, Tuple tuple) throws IOException { + AbstractHDFSWriter writer; + + writer = writers.get(writerKey); + if (writer == null) { + Path pathForNextFile = getBasePathForNextFile(tuple); + writer = makeNewWriter(pathForNextFile, tuple); + writers.put(writerKey, writer); + } + return writer; + } + + /** + * A tuple must be mapped to a writer based on two factors: + * - bolt specific logic that must separate tuples into different files in the same directory (see the avro bolt + * for an example of this) + * - the directory the tuple will be partioned into + * + * @param tuple + * @return + */ + private String getHashKeyForTuple(Tuple tuple) { + final String boltKey = getWriterKey(tuple); + final String partitionDir = this.partitioner.getPartitionPath(tuple); + return boltKey + "****" + partitionDir; + } + + void doRotationAndRemoveWriter(String writerKey, AbstractHDFSWriter writer) { + try { + rotateOutputFile(writer); + } catch (IOException e) { + this.collector.reportError(e); + LOG.error("File could not be rotated"); + //At this point there is nothing to do. In all likelihood any filesystem operations will fail. + //The next tuple will almost certainly fail to write and/or sync, which force a rotation. That + //will give rotateAndReset() a chance to work which includes creating a fresh file handle. + } finally { + writers.remove(writerKey); + } + } + @Override public Map<String, Object> getComponentConfiguration() { return TupleUtils.putTickFrequencyIntoComponentConfig(super.getComponentConfiguration(), tickTupleInterval); @@ -223,29 +252,64 @@ public abstract class AbstractHdfsBolt extends BaseRichBolt { public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { } - /** - * writes a tuple to the underlying filesystem but makes no guarantees about syncing data. - * - * this.offset is also updated to reflect additional data written - * - * @param tuple - * @throws IOException - */ - abstract protected void writeTuple(Tuple tuple) throws IOException; + private void syncAllWriters() throws IOException { + for (AbstractHDFSWriter writer : writers.values()) { + writer.sync(); + } + } - /** - * Make the best effort to sync written data to the underlying file system. Concrete classes should very clearly - * state the file state that sync guarantees. For example, HdfsBolt can make a much stronger guarantee than - * SequenceFileBolt. - * - * @throws IOException - */ - abstract protected void syncTuples() throws IOException; + private void startTimedRotationPolicy() { + long interval = ((TimedRotationPolicy)this.rotationPolicy).getInterval(); + this.rotationTimer = new Timer(true); + TimerTask task = new TimerTask() { + @Override + public void run() { + for (final AbstractHDFSWriter writer : writers.values()) { + try { + rotateOutputFile(writer); + } catch (IOException e) { + LOG.warn("IOException during scheduled file rotation.", e); + } + } + writers.clear(); + } + }; + this.rotationTimer.scheduleAtFixedRate(task, interval, interval); + } - abstract protected void closeOutputFile() throws IOException; + protected Path getBasePathForNextFile(Tuple tuple) { - abstract protected Path createOutputFile() throws IOException; + final String partitionPath = this.partitioner.getPartitionPath(tuple); + final int rotation; + if (rotationCounterMap.containsKey(partitionPath)) + { + rotation = rotationCounterMap.get(partitionPath) + 1; + } else { + rotation = 0; + } + rotationCounterMap.put(partitionPath, rotation); + + return new Path(this.fsUrl + this.fileNameFormat.getPath() + partitionPath, + this.fileNameFormat.getName(rotation, System.currentTimeMillis())); + } abstract protected void doPrepare(Map conf, TopologyContext topologyContext, OutputCollector collector) throws IOException; + abstract protected String getWriterKey(Tuple tuple); + + abstract protected AbstractHDFSWriter makeNewWriter(Path path, Tuple tuple) throws IOException; + + static class WritersMap extends LinkedHashMap<String, AbstractHDFSWriter> { + final long maxWriters; + + public WritersMap(long maxWriters) { + super((int)maxWriters, 0.75f, true); + this.maxWriters = maxWriters; + } + + @Override + protected boolean removeEldestEntry(Map.Entry<String, AbstractHDFSWriter> eldest) { + return this.size() > this.maxWriters; + } + } } http://git-wip-us.apache.org/repos/asf/storm/blob/7554fe20/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBolt.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBolt.java index cdeb2f8..e173d2a 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBolt.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBolt.java @@ -17,18 +17,16 @@ */ package org.apache.storm.hdfs.bolt; +import org.apache.storm.hdfs.common.AbstractHDFSWriter; +import org.apache.storm.hdfs.common.AvroGenericRecordHDFSWriter; +import org.apache.storm.hdfs.common.Partitioner; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.tuple.Tuple; import org.apache.avro.Schema; -import org.apache.avro.file.DataFileWriter; -import org.apache.avro.io.DatumWriter; import org.apache.avro.generic.GenericRecord; -import org.apache.avro.generic.GenericDatumWriter; -import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; import org.apache.storm.hdfs.bolt.format.FileNameFormat; import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy; @@ -38,24 +36,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.URI; -import java.util.EnumSet; import java.util.Map; public class AvroGenericRecordBolt extends AbstractHdfsBolt{ private static final Logger LOG = LoggerFactory.getLogger(AvroGenericRecordBolt.class); - private transient FSDataOutputStream out; - private Schema schema; - private String schemaAsString; - private DataFileWriter<GenericRecord> avroWriter; - - public AvroGenericRecordBolt withSchemaAsString(String schemaAsString) - { - this.schemaAsString = schemaAsString; - return this; - } - public AvroGenericRecordBolt withFsUrl(String fsUrl){ this.fsUrl = fsUrl; return this; @@ -91,51 +77,36 @@ public class AvroGenericRecordBolt extends AbstractHdfsBolt{ return this; } - @Override - protected void doPrepare(Map conf, TopologyContext topologyContext, OutputCollector collector) throws IOException { - LOG.info("Preparing AvroGenericRecord Bolt..."); - this.fs = FileSystem.get(URI.create(this.fsUrl), hdfsConfig); - Schema.Parser parser = new Schema.Parser(); - this.schema = parser.parse(this.schemaAsString); + public AvroGenericRecordBolt withMaxOpenFiles(int maxOpenFiles) { + this.maxOpenFiles = maxOpenFiles; + return this; } - @Override - protected void writeTuple(Tuple tuple) throws IOException { - GenericRecord avroRecord = (GenericRecord) tuple.getValue(0); - avroWriter.append(avroRecord); - offset = this.out.getPos(); + public AvroGenericRecordBolt withPartitioner(Partitioner partitioner) { + this.partitioner = partitioner; + return this; } @Override - protected void syncTuples() throws IOException { - avroWriter.flush(); - - LOG.debug("Attempting to sync all data to filesystem"); - if (this.out instanceof HdfsDataOutputStream) { - ((HdfsDataOutputStream) this.out).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH)); - } else { - this.out.hsync(); - } - this.syncPolicy.reset(); + protected void doPrepare(Map conf, TopologyContext topologyContext, OutputCollector collector) throws IOException { + LOG.info("Preparing AvroGenericRecord Bolt..."); + this.fs = FileSystem.get(URI.create(this.fsUrl), hdfsConfig); } + /** + * AvroGenericRecordBolt must override this method because messages with different schemas cannot be written to the + * same file. By treating the complete schema as the "key" AbstractHdfsBolt will associate a different writer for + * every distinct schema. + */ @Override - protected void closeOutputFile() throws IOException - { - avroWriter.close(); - this.out.close(); + protected String getWriterKey(Tuple tuple) { + Schema recordSchema = ((GenericRecord) tuple.getValue(0)).getSchema(); + return recordSchema.toString(); } @Override - protected Path createOutputFile() throws IOException { - Path path = new Path(this.fileNameFormat.getPath(), this.fileNameFormat.getName(this.rotation, System.currentTimeMillis())); - this.out = this.fs.create(path); - - //Initialize writer - DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema); - avroWriter = new DataFileWriter<>(datumWriter); - avroWriter.create(this.schema, this.out); - - return path; + protected AbstractHDFSWriter makeNewWriter(Path path, Tuple tuple) throws IOException { + Schema recordSchema = ((GenericRecord) tuple.getValue(0)).getSchema(); + return new AvroGenericRecordHDFSWriter(this.rotationPolicy, path, this.fs.create(path), recordSchema); } } http://git-wip-us.apache.org/repos/asf/storm/blob/7554fe20/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java index 0299f43..614de6b 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java @@ -23,12 +23,13 @@ import org.apache.storm.tuple.Tuple; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; -import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag; import org.apache.storm.hdfs.bolt.format.FileNameFormat; import org.apache.storm.hdfs.bolt.format.RecordFormat; import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy; import org.apache.storm.hdfs.bolt.sync.SyncPolicy; +import org.apache.storm.hdfs.common.AbstractHDFSWriter; +import org.apache.storm.hdfs.common.HDFSWriter; +import org.apache.storm.hdfs.common.Partitioner; import org.apache.storm.hdfs.common.rotation.RotationAction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -89,38 +90,30 @@ public class HdfsBolt extends AbstractHdfsBolt{ return this; } - @Override - public void doPrepare(Map conf, TopologyContext topologyContext, OutputCollector collector) throws IOException { - LOG.info("Preparing HDFS Bolt..."); - this.fs = FileSystem.get(URI.create(this.fsUrl), hdfsConfig); + public HdfsBolt withPartitioner(Partitioner partitioner) { + this.partitioner = partitioner; + return this; } - @Override - protected void syncTuples() throws IOException { - LOG.debug("Attempting to sync all data to filesystem"); - if (this.out instanceof HdfsDataOutputStream) { - ((HdfsDataOutputStream) this.out).hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH)); - } else { - this.out.hsync(); - } + public HdfsBolt withMaxOpenFiles(int maxOpenFiles) { + this.maxOpenFiles = maxOpenFiles; + return this; } @Override - protected void writeTuple(Tuple tuple) throws IOException { - byte[] bytes = this.format.format(tuple); - out.write(bytes); - this.offset += bytes.length; + public void doPrepare(Map conf, TopologyContext topologyContext, OutputCollector collector) throws IOException { + LOG.info("Preparing HDFS Bolt..."); + this.fs = FileSystem.get(URI.create(this.fsUrl), hdfsConfig); } @Override - protected void closeOutputFile() throws IOException { - this.out.close(); + protected String getWriterKey(Tuple tuple) { + return "CONSTANT"; } @Override - protected Path createOutputFile() throws IOException { - Path path = new Path(this.fileNameFormat.getPath(), this.fileNameFormat.getName(this.rotation, System.currentTimeMillis())); + protected AbstractHDFSWriter makeNewWriter(Path path, Tuple tuple) throws IOException { this.out = this.fs.create(path); - return path; + return new HDFSWriter(rotationPolicy,path, out, format); } } http://git-wip-us.apache.org/repos/asf/storm/blob/7554fe20/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileBolt.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileBolt.java index e0db7c9..3c78075 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileBolt.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileBolt.java @@ -28,6 +28,9 @@ import org.apache.storm.hdfs.bolt.format.FileNameFormat; import org.apache.storm.hdfs.bolt.format.SequenceFormat; import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy; import org.apache.storm.hdfs.bolt.sync.SyncPolicy; +import org.apache.storm.hdfs.common.AbstractHDFSWriter; +import org.apache.storm.hdfs.common.Partitioner; +import org.apache.storm.hdfs.common.SequenceFileWriter; import org.apache.storm.hdfs.common.rotation.RotationAction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -104,6 +107,16 @@ public class SequenceFileBolt extends AbstractHdfsBolt { return this; } + public SequenceFileBolt withPartitioner(Partitioner partitioner) { + this.partitioner = partitioner; + return this; + } + + public SequenceFileBolt withMaxOpenFiles(int maxOpenFiles) { + this.maxOpenFiles = maxOpenFiles; + return this; + } + @Override public void doPrepare(Map conf, TopologyContext topologyContext, OutputCollector collector) throws IOException { LOG.info("Preparing Sequence File Bolt..."); @@ -114,30 +127,20 @@ public class SequenceFileBolt extends AbstractHdfsBolt { } @Override - protected void syncTuples() throws IOException { - LOG.debug("Attempting to sync all data to filesystem"); - this.writer.hsync(); + protected String getWriterKey(Tuple tuple) { + return "CONSTANT"; } @Override - protected void writeTuple(Tuple tuple) throws IOException { - this.writer.append(this.format.key(tuple), this.format.value(tuple)); - this.offset = this.writer.getLength(); - } - - protected Path createOutputFile() throws IOException { - Path p = new Path(this.fsUrl + this.fileNameFormat.getPath(), this.fileNameFormat.getName(this.rotation, System.currentTimeMillis())); - this.writer = SequenceFile.createWriter( + protected AbstractHDFSWriter makeNewWriter(Path path, Tuple tuple) throws IOException { + SequenceFile.Writer writer = SequenceFile.createWriter( this.hdfsConfig, - SequenceFile.Writer.file(p), + SequenceFile.Writer.file(path), SequenceFile.Writer.keyClass(this.format.keyClass()), SequenceFile.Writer.valueClass(this.format.valueClass()), SequenceFile.Writer.compression(this.compressionType, this.codecFactory.getCodecByName(this.compressionCodec)) ); - return p; - } - protected void closeOutputFile() throws IOException { - this.writer.close(); + return new SequenceFileWriter(this.rotationPolicy, path, writer, this.format); } } http://git-wip-us.apache.org/repos/asf/storm/blob/7554fe20/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/FileRotationPolicy.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/FileRotationPolicy.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/FileRotationPolicy.java index 90ef772..aeb63fa 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/FileRotationPolicy.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/FileRotationPolicy.java @@ -48,4 +48,9 @@ public interface FileRotationPolicy extends Serializable { * */ void reset(); + + /** + * Must be able to copy the rotation policy + */ + FileRotationPolicy copy(); } http://git-wip-us.apache.org/repos/asf/storm/blob/7554fe20/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/FileSizeRotationPolicy.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/FileSizeRotationPolicy.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/FileSizeRotationPolicy.java index f0df921..5fb9bbc 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/FileSizeRotationPolicy.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/FileSizeRotationPolicy.java @@ -64,6 +64,10 @@ public class FileSizeRotationPolicy implements FileRotationPolicy { this.maxBytes = (long)(count * units.getByteCount()); } + protected FileSizeRotationPolicy(long maxBytes) { + this.maxBytes = maxBytes; + } + @Override public boolean mark(Tuple tuple, long offset) { long diff = offset - this.lastOffset; @@ -78,4 +82,8 @@ public class FileSizeRotationPolicy implements FileRotationPolicy { this.lastOffset = 0; } + @Override + public FileRotationPolicy copy() { + return new FileSizeRotationPolicy(this.maxBytes); + } } http://git-wip-us.apache.org/repos/asf/storm/blob/7554fe20/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/NoRotationPolicy.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/NoRotationPolicy.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/NoRotationPolicy.java index 14fa496..a00037b 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/NoRotationPolicy.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/NoRotationPolicy.java @@ -32,4 +32,9 @@ public class NoRotationPolicy implements FileRotationPolicy { @Override public void reset() { } + + @Override + public FileRotationPolicy copy() { + return this; + } } http://git-wip-us.apache.org/repos/asf/storm/blob/7554fe20/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/TimedRotationPolicy.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/TimedRotationPolicy.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/TimedRotationPolicy.java index 84762a0..06fada8 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/TimedRotationPolicy.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/rotation/TimedRotationPolicy.java @@ -45,6 +45,9 @@ public class TimedRotationPolicy implements FileRotationPolicy { this.interval = (long)(count * units.getMilliSeconds()); } + protected TimedRotationPolicy(long interval) { + this.interval = interval; + } /** * Called for every tuple the HdfsBolt executes. @@ -66,6 +69,11 @@ public class TimedRotationPolicy implements FileRotationPolicy { } + @Override + public FileRotationPolicy copy() { + return new TimedRotationPolicy(this.interval); + } + public long getInterval(){ return this.interval; } http://git-wip-us.apache.org/repos/asf/storm/blob/7554fe20/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AbstractHDFSWriter.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AbstractHDFSWriter.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AbstractHDFSWriter.java new file mode 100644 index 0000000..4b36377 --- /dev/null +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AbstractHDFSWriter.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.hdfs.common; + +import org.apache.hadoop.fs.Path; +import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy; +import org.apache.storm.tuple.Tuple; + +import java.io.IOException; + +abstract public class AbstractHDFSWriter { + long lastUsedTime; + long offset; + boolean needsRotation; + Path filePath; + FileRotationPolicy rotationPolicy; + + AbstractHDFSWriter(FileRotationPolicy policy, Path path) { + //This must be defensively copied, because a bolt probably has only one rotation policy object + this.rotationPolicy = policy.copy(); + this.filePath = path; + } + + final public long write(Tuple tuple) throws IOException{ + doWrite(tuple); + this.needsRotation = rotationPolicy.mark(tuple, offset); + + return this.offset; + } + + final public void sync() throws IOException { + doSync(); + } + + final public void close() throws IOException { + doClose(); + } + + public boolean needsRotation() { + return needsRotation; + } + + public Path getFilePath() { + return this.filePath; + } + + abstract protected void doWrite(Tuple tuple) throws IOException; + + abstract protected void doSync() throws IOException; + + abstract protected void doClose() throws IOException; + +} http://git-wip-us.apache.org/repos/asf/storm/blob/7554fe20/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AvroGenericRecordHDFSWriter.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AvroGenericRecordHDFSWriter.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AvroGenericRecordHDFSWriter.java new file mode 100644 index 0000000..6e957c2 --- /dev/null +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AvroGenericRecordHDFSWriter.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.hdfs.common; + + +import org.apache.avro.Schema; +import org.apache.avro.file.DataFileWriter; +import org.apache.avro.generic.GenericDatumWriter; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumWriter; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; +import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy; +import org.apache.storm.tuple.Tuple; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.EnumSet; + +public class AvroGenericRecordHDFSWriter extends AbstractHDFSWriter { + + private static final Logger LOG = LoggerFactory.getLogger(AvroGenericRecordHDFSWriter.class); + + private FSDataOutputStream out; + private Schema schema; + private DataFileWriter<GenericRecord> avroWriter; + + public AvroGenericRecordHDFSWriter(FileRotationPolicy policy, Path path, FSDataOutputStream stream, Schema schema) throws IOException { + super(policy, path); + this.out = stream; + this.schema = schema; + DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema); + avroWriter = new DataFileWriter<>(datumWriter); + avroWriter.create(this.schema, this.out); + } + + @Override + protected void doWrite(Tuple tuple) throws IOException { + GenericRecord avroRecord = (GenericRecord) tuple.getValue(0); + avroWriter.append(avroRecord); + offset = this.out.getPos(); + + this.needsRotation = this.rotationPolicy.mark(tuple, offset); + } + + @Override + protected void doSync() throws IOException { + avroWriter.flush(); + + LOG.debug("Attempting to sync all data to filesystem"); + if (this.out instanceof HdfsDataOutputStream) { + ((HdfsDataOutputStream) this.out).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH)); + } else { + this.out.hsync(); + } + } + + @Override + protected void doClose() throws IOException { + this.avroWriter.close(); + this.out.close(); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/7554fe20/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HDFSWriter.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HDFSWriter.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HDFSWriter.java new file mode 100644 index 0000000..d69d770 --- /dev/null +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/HDFSWriter.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.hdfs.common; + +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; +import org.apache.storm.hdfs.bolt.format.RecordFormat; +import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy; +import org.apache.storm.tuple.Tuple; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.EnumSet; + +public class HDFSWriter extends AbstractHDFSWriter{ + + private static final Logger LOG = LoggerFactory.getLogger(HDFSWriter.class); + + private FSDataOutputStream out; + private RecordFormat format; + + public HDFSWriter(FileRotationPolicy policy, Path path, FSDataOutputStream out, RecordFormat format) { + super(policy, path); + this.out = out; + this.format = format; + } + + @Override + protected void doWrite(Tuple tuple) throws IOException { + byte[] bytes = this.format.format(tuple); + out.write(bytes); + this.offset += bytes.length; + } + + @Override + protected void doSync() throws IOException { + LOG.info("Attempting to sync all data to filesystem"); + if (this.out instanceof HdfsDataOutputStream) { + ((HdfsDataOutputStream) this.out).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH)); + } else { + this.out.hsync(); + } + } + + @Override + protected void doClose() throws IOException { + this.out.close(); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/7554fe20/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/NullPartitioner.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/NullPartitioner.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/NullPartitioner.java new file mode 100644 index 0000000..fd50496 --- /dev/null +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/NullPartitioner.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.hdfs.common; + +import org.apache.storm.tuple.Tuple; + +/** + * The NullPartitioner partitions every tuple to the empty string. In otherwords, no partition sub directories will + * be added to the path. + */ +public class NullPartitioner implements Partitioner { + @Override + public String getPartitionPath(final Tuple tuple) { + return ""; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/7554fe20/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/Partitioner.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/Partitioner.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/Partitioner.java new file mode 100644 index 0000000..6cf0fbd --- /dev/null +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/Partitioner.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.hdfs.common; + +import org.apache.storm.tuple.Tuple; + +import java.io.Serializable; + +public interface Partitioner extends Serializable{ + + /** + * Return a relative path that the tuple should be written to. For example, if an HdfsBolt were configured to write + * to /common/output and a partitioner returned "/foo" then the bolt should open a file in "/common/output/foo" + * + * A best practice is to use Path.SEPARATOR instead of a literal "/" + * + * @param tuple The tuple for which the relative path is being calculated. + * @return + */ + public String getPartitionPath(final Tuple tuple); +} http://git-wip-us.apache.org/repos/asf/storm/blob/7554fe20/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/SequenceFileWriter.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/SequenceFileWriter.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/SequenceFileWriter.java new file mode 100644 index 0000000..ec78fd6 --- /dev/null +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/SequenceFileWriter.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.hdfs.common; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.SequenceFile; +import org.apache.storm.hdfs.bolt.format.SequenceFormat; +import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy; +import org.apache.storm.tuple.Tuple; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +public class SequenceFileWriter extends AbstractHDFSWriter{ + + private static final Logger LOG = LoggerFactory.getLogger(SequenceFileWriter.class); + + private SequenceFile.Writer writer; + private SequenceFormat format; + + public SequenceFileWriter(FileRotationPolicy policy, Path path, SequenceFile.Writer writer, SequenceFormat format) { + super(policy, path); + this.writer = writer; + this.format = format; + } + + @Override + protected void doWrite(Tuple tuple) throws IOException { + this.writer.append(this.format.key(tuple), this.format.value(tuple)); + this.offset = this.writer.getLength(); + } + + @Override + protected void doSync() throws IOException { + LOG.debug("Attempting to sync all data to filesystem"); + this.writer.hsync(); + } + + @Override + protected void doClose() throws IOException { + this.writer.close(); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/7554fe20/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBoltTest.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBoltTest.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBoltTest.java index 8ff05bc..cd828da 100644 --- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBoltTest.java +++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/AvroGenericRecordBoltTest.java @@ -27,7 +27,7 @@ import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.TupleImpl; import org.apache.storm.tuple.Values; import org.apache.avro.Schema; -import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecordBuilder; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DatumReader; @@ -40,6 +40,7 @@ import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy; import org.apache.storm.hdfs.bolt.sync.SyncPolicy; import org.junit.Before; import org.junit.After; +import org.junit.Ignore; import org.junit.Test; import org.junit.Assert; @@ -65,28 +66,38 @@ public class AvroGenericRecordBoltTest { private DistributedFileSystem fs; private MiniDFSCluster hdfsCluster; private static final String testRoot = "/unittest"; - private static final Schema schema; + private static final Schema schema1; + private static final Schema schema2; private static final Tuple tuple1; private static final Tuple tuple2; - private static final String userSchema = "{\"type\":\"record\"," + + private static final String schemaV1 = "{\"type\":\"record\"," + "\"name\":\"myrecord\"," + "\"fields\":[{\"name\":\"foo1\",\"type\":\"string\"}," + "{ \"name\":\"int1\", \"type\":\"int\" }]}"; + private static final String schemaV2 = "{\"type\":\"record\"," + + "\"name\":\"myrecord\"," + + "\"fields\":[{\"name\":\"foo1\",\"type\":\"string\"}," + + "{ \"name\":\"bar\", \"type\":\"string\", \"default\":\"baz\" }," + + "{ \"name\":\"int1\", \"type\":\"int\" }]}"; + static { Schema.Parser parser = new Schema.Parser(); - schema = parser.parse(userSchema); + schema1 = parser.parse(schemaV1); + + parser = new Schema.Parser(); + schema2 = parser.parse(schemaV2); - GenericRecord record1 = new GenericData.Record(schema); - record1.put("foo1", "bar1"); - record1.put("int1", 1); - tuple1 = generateTestTuple(record1); + GenericRecordBuilder builder1 = new GenericRecordBuilder(schema1); + builder1.set("foo1", "bar1"); + builder1.set("int1", 1); + tuple1 = generateTestTuple(builder1.build()); - GenericRecord record2 = new GenericData.Record(schema); - record2.put("foo1", "bar2"); - record2.put("int1", 2); - tuple2 = generateTestTuple(record2); + GenericRecordBuilder builder2 = new GenericRecordBuilder(schema2); + builder2.set("foo1", "bar2"); + builder2.set("int1", 2); + tuple2 = generateTestTuple(builder2.build()); } @Mock private OutputCollector collector; @@ -116,30 +127,76 @@ public class AvroGenericRecordBoltTest { @Test public void multipleTuplesOneFile() throws IOException { - AvroGenericRecordBolt bolt = makeAvroBolt(hdfsURI, 1, 1f, userSchema); + AvroGenericRecordBolt bolt = makeAvroBolt(hdfsURI, 1, 1f, schemaV1); bolt.prepare(new Config(), topologyContext, collector); bolt.execute(tuple1); - bolt.execute(tuple2); bolt.execute(tuple1); - bolt.execute(tuple2); + bolt.execute(tuple1); + bolt.execute(tuple1); Assert.assertEquals(1, countNonZeroLengthFiles(testRoot)); - verifyAllAvroFiles(testRoot, schema); + verifyAllAvroFiles(testRoot); } @Test public void multipleTuplesMutliplesFiles() throws IOException { - AvroGenericRecordBolt bolt = makeAvroBolt(hdfsURI, 1, .0001f, userSchema); + AvroGenericRecordBolt bolt = makeAvroBolt(hdfsURI, 1, .0001f, schemaV1); + + bolt.prepare(new Config(), topologyContext, collector); + bolt.execute(tuple1); + bolt.execute(tuple1); + bolt.execute(tuple1); + bolt.execute(tuple1); + + Assert.assertEquals(4, countNonZeroLengthFiles(testRoot)); + verifyAllAvroFiles(testRoot); + } + + @Test public void forwardSchemaChangeWorks() throws IOException + { + AvroGenericRecordBolt bolt = makeAvroBolt(hdfsURI, 1, 1000f, schemaV1); + + bolt.prepare(new Config(), topologyContext, collector); + bolt.execute(tuple1); + bolt.execute(tuple2); + + //Schema change should have forced a rotation + Assert.assertEquals(2, countNonZeroLengthFiles(testRoot)); + + verifyAllAvroFiles(testRoot); + } + + @Test public void backwardSchemaChangeWorks() throws IOException + { + AvroGenericRecordBolt bolt = makeAvroBolt(hdfsURI, 1, 1000f, schemaV2); + + bolt.prepare(new Config(), topologyContext, collector); + bolt.execute(tuple1); + bolt.execute(tuple2); + + //Schema changes should have forced file rotations + Assert.assertEquals(2, countNonZeroLengthFiles(testRoot)); + verifyAllAvroFiles(testRoot); + } + + @Test public void schemaThrashing() throws IOException + { + AvroGenericRecordBolt bolt = makeAvroBolt(hdfsURI, 1, 1000f, schemaV2); bolt.prepare(new Config(), topologyContext, collector); bolt.execute(tuple1); bolt.execute(tuple2); bolt.execute(tuple1); bolt.execute(tuple2); + bolt.execute(tuple1); + bolt.execute(tuple2); + bolt.execute(tuple1); + bolt.execute(tuple2); - Assert.assertEquals(4, countNonZeroLengthFiles(testRoot)); - verifyAllAvroFiles(testRoot, schema); + //Two distinct schema should result in only two files + Assert.assertEquals(2, countNonZeroLengthFiles(testRoot)); + verifyAllAvroFiles(testRoot); } private AvroGenericRecordBolt makeAvroBolt(String nameNodeAddr, int countSync, float rotationSizeMB, String schemaAsString) { @@ -154,7 +211,6 @@ public class AvroGenericRecordBoltTest { return new AvroGenericRecordBolt() .withFsUrl(nameNodeAddr) .withFileNameFormat(fieldsFileNameFormat) - .withSchemaAsString(schemaAsString) .withRotationPolicy(rotationPolicy) .withSyncPolicy(fieldsSyncPolicy); } @@ -171,12 +227,12 @@ public class AvroGenericRecordBoltTest { return new TupleImpl(topologyContext, new Values(record), 1, ""); } - private void verifyAllAvroFiles(String path, Schema schema) throws IOException { + private void verifyAllAvroFiles(String path) throws IOException { Path p = new Path(path); for (FileStatus file : fs.listStatus(p)) { if (file.getLen() > 0) { - fileIsGoodAvro(file.getPath(), schema); + fileIsGoodAvro(file.getPath()); } } } @@ -194,8 +250,8 @@ public class AvroGenericRecordBoltTest { return nonZero; } - private void fileIsGoodAvro (Path path, Schema schema) throws IOException { - DatumReader<GenericRecord> datumReader = new GenericDatumReader<>(schema); + private void fileIsGoodAvro (Path path) throws IOException { + DatumReader<GenericRecord> datumReader = new GenericDatumReader<>(); FSDataInputStream in = fs.open(path, 0); FileOutputStream out = new FileOutputStream("target/FOO.avro"); @@ -212,7 +268,6 @@ public class AvroGenericRecordBoltTest { GenericRecord user = null; while (dataFileReader.hasNext()) { user = dataFileReader.next(user); - System.out.println(user); } file.delete(); http://git-wip-us.apache.org/repos/asf/storm/blob/7554fe20/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestHdfsBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestHdfsBolt.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestHdfsBolt.java index ecbad8a..e8f0702 100644 --- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestHdfsBolt.java +++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestHdfsBolt.java @@ -35,6 +35,7 @@ import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy; import org.apache.storm.hdfs.bolt.rotation.FileSizeRotationPolicy; import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy; import org.apache.storm.hdfs.bolt.sync.SyncPolicy; +import org.apache.storm.hdfs.common.Partitioner; import org.junit.Before; import org.junit.After; import org.junit.Rule; @@ -109,6 +110,30 @@ public class TestHdfsBolt { } @Test + public void testPartitionedOutput() throws IOException { + HdfsBolt bolt = makeHdfsBolt(hdfsURI, 1, 1000f); + + Partitioner partitoner = new Partitioner() { + @Override + public String getPartitionPath(Tuple tuple) { + return Path.SEPARATOR + tuple.getStringByField("city"); + } + }; + + bolt.prepare(new Config(), topologyContext, collector); + bolt.withPartitioner(partitoner); + + bolt.execute(tuple1); + bolt.execute(tuple2); + + verify(collector).ack(tuple1); + verify(collector).ack(tuple2); + + Assert.assertEquals(1, countNonZeroLengthFiles(testRoot + "/SFO")); + Assert.assertEquals(1, countNonZeroLengthFiles(testRoot + "/SJO")); + } + + @Test public void testTwoTuplesOneFile() throws IOException { HdfsBolt bolt = makeHdfsBolt(hdfsURI, 2, 10000f); @@ -127,8 +152,9 @@ public class TestHdfsBolt { @Test public void testFailedSync() throws IOException { - HdfsBolt bolt = makeHdfsBolt(hdfsURI, 1, .00001f); + HdfsBolt bolt = makeHdfsBolt(hdfsURI, 2, 10000f); bolt.prepare(new Config(), topologyContext, collector); + bolt.execute(tuple1); fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER); @@ -138,8 +164,8 @@ public class TestHdfsBolt { } - // One tuple and one rotation should yield one file with data and one zero length file - // The failed executions should not cause rotations and new zero length files + // One tuple and one rotation should yield one file with data + // The failed executions should not cause rotations and any new files @Test public void testFailureFilecount() throws IOException, InterruptedException { @@ -168,7 +194,7 @@ public class TestHdfsBolt { } Assert.assertEquals(1, countNonZeroLengthFiles(testRoot)); - Assert.assertEquals(1, countZeroLengthFiles(testRoot)); + Assert.assertEquals(0, countZeroLengthFiles(testRoot)); } @Test http://git-wip-us.apache.org/repos/asf/storm/blob/7554fe20/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestSequenceFileBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestSequenceFileBolt.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestSequenceFileBolt.java index 870d4ca..9913d9d 100644 --- a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestSequenceFileBolt.java +++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestSequenceFileBolt.java @@ -127,14 +127,14 @@ public class TestSequenceFileBolt { @Test public void testFailedSync() throws IOException { - SequenceFileBolt bolt = makeSeqBolt(hdfsURI, 1, .00001f); + SequenceFileBolt bolt = makeSeqBolt(hdfsURI, 2, 10000f); bolt.prepare(new Config(), topologyContext, collector); + bolt.execute(tuple1); fs.setSafeMode(SafeModeAction.SAFEMODE_ENTER); // All writes/syncs will fail so this should cause a RuntimeException thrown.expect(RuntimeException.class); bolt.execute(tuple1); - } private SequenceFileBolt makeSeqBolt(String nameNodeAddr, int countSync, float rotationSizeMB) { http://git-wip-us.apache.org/repos/asf/storm/blob/7554fe20/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestWritersMap.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestWritersMap.java b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestWritersMap.java new file mode 100644 index 0000000..fd99efe --- /dev/null +++ b/external/storm-hdfs/src/test/java/org/apache/storm/hdfs/bolt/TestWritersMap.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.hdfs.bolt; + +import org.apache.storm.hdfs.common.AbstractHDFSWriter; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mock; + +public class TestWritersMap { + + AbstractHdfsBolt.WritersMap map = new AbstractHdfsBolt.WritersMap(2); + @Mock AbstractHDFSWriter foo; + @Mock AbstractHDFSWriter bar; + @Mock AbstractHDFSWriter baz; + + @Test public void testLRUBehavior() + { + map.put("FOO", foo); + map.put("BAR", bar); + + //Access foo to make it most recently used + map.get("FOO"); + + //Add an element and bar should drop out + map.put("BAZ", baz); + + Assert.assertTrue(map.keySet().contains("FOO")); + Assert.assertTrue(map.keySet().contains("BAZ")); + + Assert.assertFalse(map.keySet().contains("BAR")); + } +}
