[
https://issues.apache.org/jira/browse/STORM-211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14041590#comment-14041590
]
ASF GitHub Bot commented on STORM-211:
--------------------------------------
Github user ptgoetz commented on a diff in the pull request:
https://github.com/apache/incubator-storm/pull/128#discussion_r14109580
--- Diff:
external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileBolt.java
---
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hdfs.bolt;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.tuple.Tuple;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.storm.hdfs.bolt.format.FileNameFormat;
+import org.apache.storm.hdfs.bolt.format.SequenceFormat;
+import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
+import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
+import org.apache.storm.hdfs.common.rotation.RotationAction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Map;
+
+public class SequenceFileBolt extends AbstractHdfsBolt {
+ private static final Logger LOG =
LoggerFactory.getLogger(SequenceFileBolt.class);
+
+ private SequenceFormat format;
+ private SequenceFile.CompressionType compressionType =
SequenceFile.CompressionType.RECORD;
+ private SequenceFile.Writer writer;
+
+ private String compressionCodec = "default";
+ private transient CompressionCodecFactory codecFactory;
+
+ public SequenceFileBolt() {
+ }
+
+ public SequenceFileBolt withCompressionCodec(String codec){
+ this.compressionCodec = codec;
+ return this;
+ }
+
+ public SequenceFileBolt withFsUrl(String fsUrl) {
+ this.fsUrl = fsUrl;
+ return this;
+ }
+
+ public SequenceFileBolt withConfigKey(String configKey){
+ this.configKey = configKey;
+ return this;
+ }
+
+ public SequenceFileBolt withFileNameFormat(FileNameFormat
fileNameFormat) {
+ this.fileNameFormat = fileNameFormat;
+ return this;
+ }
+
+ public SequenceFileBolt withSequenceFormat(SequenceFormat format) {
+ this.format = format;
+ return this;
+ }
+
+ public SequenceFileBolt withSyncPolicy(SyncPolicy syncPolicy) {
+ this.syncPolicy = syncPolicy;
+ return this;
+ }
+
+ public SequenceFileBolt withRotationPolicy(FileRotationPolicy
rotationPolicy) {
+ this.rotationPolicy = rotationPolicy;
+ return this;
+ }
+
+ public SequenceFileBolt
withCompressionType(SequenceFile.CompressionType compressionType){
+ this.compressionType = compressionType;
+ return this;
+ }
+
+ public SequenceFileBolt addRotationAction(RotationAction action){
+ this.rotationActions.add(action);
+ return this;
+ }
+
+ @Override
+ public void doPrepare(Map conf, TopologyContext topologyContext,
OutputCollector collector) throws IOException {
+ LOG.info("Preparing Sequence File Bolt...");
+ if (this.format == null) throw new
IllegalStateException("SequenceFormat must be specified.");
+
+ this.fs = FileSystem.get(URI.create(this.fsUrl), hdfsConfig);
+ this.codecFactory = new CompressionCodecFactory(hdfsConfig);
+ }
+
+ @Override
+ public void execute(Tuple tuple) {
+ try {
+ this.writer.append(this.format.key(tuple),
this.format.value(tuple));
+ long offset = this.writer.getLength();
+ this.collector.ack(tuple);
+
+ if (this.syncPolicy.mark(tuple, offset)) {
+ this.writer.hsync();
--- End diff --
I'm okay with targeting 2.x. I'll update the docs.
> Add module for HDFS integration
> -------------------------------
>
> Key: STORM-211
> URL: https://issues.apache.org/jira/browse/STORM-211
> Project: Apache Storm (Incubating)
> Issue Type: Sub-task
> Reporter: P. Taylor Goetz
>
> Add a module with generic components (storm, trident) for interacting with
> HDFS:
> - Write to regular and sequence files
> - Core bolts, and Trident state implementation.
> - Integrate with secure (kerberos-enabled) HDFS
--
This message was sent by Atlassian JIRA
(v6.2#6252)