Repository: storm Updated Branches: refs/heads/master 29a44e57a -> d06bb3856
STORM-2517 add interface for Writer, make AbstractHDFSWriter properties protected Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d3ece270 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d3ece270 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d3ece270 Branch: refs/heads/master Commit: d3ece270fbcace79195a4217731c3f82bcf2b360 Parents: c38d795 Author: Angus Helm <[email protected]> Authored: Thu Aug 24 15:34:19 2017 -0500 Committer: Angus Helm <[email protected]> Committed: Thu Aug 24 15:34:19 2017 -0500 ---------------------------------------------------------------------- .../storm/hdfs/bolt/AbstractHdfsBolt.java | 23 ++++++------- .../java/org/apache/storm/hdfs/bolt/Writer.java | 35 ++++++++++++++++++++ .../storm/hdfs/common/AbstractHDFSWriter.java | 17 +++++----- 3 files changed, 55 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/d3ece270/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java index 681d66a..366c50c 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java @@ -30,7 +30,6 @@ import org.apache.storm.hdfs.bolt.format.FileNameFormat; import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy; import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy; import org.apache.storm.hdfs.bolt.sync.SyncPolicy; -import org.apache.storm.hdfs.common.AbstractHDFSWriter; import org.apache.storm.hdfs.common.NullPartitioner; import org.apache.storm.hdfs.common.Partitioner; import org.apache.storm.hdfs.common.rotation.RotationAction; @@ -57,7 +56,7 @@ public abstract class AbstractHdfsBolt extends BaseRichBolt { private static final int DEFAULT_TICK_TUPLE_INTERVAL_SECS = 15; private static final Integer DEFAULT_MAX_OPEN_FILES = 50; - protected Map<String, AbstractHDFSWriter> writers; + protected Map<String, Writer> writers; protected Map<String, Integer> rotationCounterMap = new HashMap<>(); protected List<RotationAction> rotationActions = new ArrayList<>(); protected OutputCollector collector; @@ -78,7 +77,7 @@ public abstract class AbstractHdfsBolt extends BaseRichBolt { protected transient Configuration hdfsConfig; - protected void rotateOutputFile(AbstractHDFSWriter writer) throws IOException { + protected void rotateOutputFile(Writer writer) throws IOException { LOG.info("Rotating output file..."); long start = System.currentTimeMillis(); synchronized (this.writeLock) { @@ -136,7 +135,7 @@ public abstract class AbstractHdfsBolt extends BaseRichBolt { synchronized (this.writeLock) { boolean forceSync = false; - AbstractHDFSWriter writer = null; + Writer writer = null; String writerKey = null; if (TupleUtils.isTick(tuple)) { @@ -202,8 +201,8 @@ public abstract class AbstractHdfsBolt extends BaseRichBolt { } } - private AbstractHDFSWriter getOrCreateWriter(String writerKey, Tuple tuple) throws IOException { - AbstractHDFSWriter writer; + private Writer getOrCreateWriter(String writerKey, Tuple tuple) throws IOException { + Writer writer; writer = writers.get(writerKey); if (writer == null) { @@ -229,7 +228,7 @@ public abstract class AbstractHdfsBolt extends BaseRichBolt { return boltKey + "****" + partitionDir; } - void doRotationAndRemoveWriter(String writerKey, AbstractHDFSWriter writer) { + void doRotationAndRemoveWriter(String writerKey, Writer writer) { try { rotateOutputFile(writer); } catch (IOException e) { @@ -258,7 +257,7 @@ public abstract class AbstractHdfsBolt extends BaseRichBolt { } private void doRotationAndRemoveAllWriters() { - for (final AbstractHDFSWriter writer : writers.values()) { + for (final Writer writer : writers.values()) { try { rotateOutputFile(writer); } catch (IOException e) { @@ -269,7 +268,7 @@ public abstract class AbstractHdfsBolt extends BaseRichBolt { } private void syncAllWriters() throws IOException { - for (AbstractHDFSWriter writer : writers.values()) { + for (Writer writer : writers.values()) { writer.sync(); } } @@ -306,9 +305,9 @@ public abstract class AbstractHdfsBolt extends BaseRichBolt { abstract protected String getWriterKey(Tuple tuple); - abstract protected AbstractHDFSWriter makeNewWriter(Path path, Tuple tuple) throws IOException; + abstract protected Writer makeNewWriter(Path path, Tuple tuple) throws IOException; - static class WritersMap extends LinkedHashMap<String, AbstractHDFSWriter> { + static class WritersMap extends LinkedHashMap<String, Writer> { final long maxWriters; public WritersMap(long maxWriters) { @@ -317,7 +316,7 @@ public abstract class AbstractHdfsBolt extends BaseRichBolt { } @Override - protected boolean removeEldestEntry(Map.Entry<String, AbstractHDFSWriter> eldest) { + protected boolean removeEldestEntry(Map.Entry<String, Writer> eldest) { return this.size() > this.maxWriters; } } http://git-wip-us.apache.org/repos/asf/storm/blob/d3ece270/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/Writer.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/Writer.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/Writer.java new file mode 100644 index 0000000..26a0a6f --- /dev/null +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/Writer.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.hdfs.bolt; + +import org.apache.hadoop.fs.Path; +import org.apache.storm.tuple.Tuple; + +import java.io.IOException; + +public interface Writer { + long write(Tuple tuple) throws IOException; + + void sync() throws IOException; + + void close() throws IOException; + + boolean needsRotation(); + + Path getFilePath(); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/d3ece270/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AbstractHDFSWriter.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AbstractHDFSWriter.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AbstractHDFSWriter.java index 4b36377..d73eb7d 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AbstractHDFSWriter.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/common/AbstractHDFSWriter.java @@ -18,25 +18,26 @@ package org.apache.storm.hdfs.common; import org.apache.hadoop.fs.Path; +import org.apache.storm.hdfs.bolt.Writer; import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy; import org.apache.storm.tuple.Tuple; import java.io.IOException; -abstract public class AbstractHDFSWriter { - long lastUsedTime; - long offset; - boolean needsRotation; - Path filePath; - FileRotationPolicy rotationPolicy; +abstract public class AbstractHDFSWriter implements Writer { + protected long lastUsedTime; + protected long offset; + protected boolean needsRotation; + final protected Path filePath; + final protected FileRotationPolicy rotationPolicy; - AbstractHDFSWriter(FileRotationPolicy policy, Path path) { + public AbstractHDFSWriter(FileRotationPolicy policy, Path path) { //This must be defensively copied, because a bolt probably has only one rotation policy object this.rotationPolicy = policy.copy(); this.filePath = path; } - final public long write(Tuple tuple) throws IOException{ + final public long write(Tuple tuple) throws IOException { doWrite(tuple); this.needsRotation = rotationPolicy.mark(tuple, offset);
