[
https://issues.apache.org/jira/browse/STORM-211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14012938#comment-14012938
]
ASF GitHub Bot commented on STORM-211:
--------------------------------------
Github user revans2 commented on a diff in the pull request:
https://github.com/apache/incubator-storm/pull/128#discussion_r13205800
--- Diff:
external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
---
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.hdfs.bolt;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.storm.hdfs.bolt.format.FileNameFormat;
+import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
+import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
+import org.apache.storm.hdfs.common.rotation.RotationAction;
+import org.apache.storm.hdfs.common.security.HdfsSecurityUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Map;
+
+public abstract class AbstractHdfsBolt extends BaseRichBolt {
+ private static final Logger LOG =
LoggerFactory.getLogger(AbstractHdfsBolt.class);
+
+ protected ArrayList<RotationAction> rotationActions = new
ArrayList<RotationAction>();
+ private Path currentFile;
+ protected OutputCollector collector;
+ protected FileSystem fs;
+ protected SyncPolicy syncPolicy;
+ protected FileRotationPolicy rotationPolicy;
+ protected FileNameFormat fileNameFormat;
+ protected int rotation = 0;
+ protected String fsUrl;
+ protected String configKey;
+// protected String path;
+
+ protected Configuration hdfsConfig;
+
+ protected void rotateOutputFile() throws IOException {
+ LOG.info("Rotating output file...");
+ long start = System.currentTimeMillis();
+ closeOutputFile();
+ this.rotation++;
+
+ Path newFile = createOutputFile();
+ LOG.info("Performing {} file rotation actions.",
this.rotationActions.size());
+ for(RotationAction action : this.rotationActions){
+ action.execute(this.fs, this.currentFile);
+ }
+ this.currentFile = newFile;
+ long time = System.currentTimeMillis() - start;
+ LOG.info("File rotation took {} ms.", time);
+ }
+
+ public void prepare(Map conf, TopologyContext topologyContext,
OutputCollector collector){
--- End diff --
It might be good to mark this as final and add some javadocs so that it is
obvious that doPrepare is the extension point and not prepare.
> Add module for HDFS integration
> -------------------------------
>
> Key: STORM-211
> URL: https://issues.apache.org/jira/browse/STORM-211
> Project: Apache Storm (Incubating)
> Issue Type: Sub-task
> Reporter: P. Taylor Goetz
>
> Add a module with generic components (storm, trident) for interacting with
> HDFS:
> - Write to regular and sequence files
> - Core bolts, and Trident state implementation.
> - Integrate with secure (kerberos-enabled) HDFS
--
This message was sent by Atlassian JIRA
(v6.2#6252)