Component docs

Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/5e0c0faf
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/5e0c0faf
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/5e0c0faf

Branch: refs/heads/master
Commit: 5e0c0faf3d598058b7c43639eef467f21fd0ad88
Parents: adf1df7
Author: Claus Ibsen <davscl...@apache.org>
Authored: Thu May 7 12:02:37 2015 +0200
Committer: Claus Ibsen <davscl...@apache.org>
Committed: Thu May 7 12:04:39 2015 +0200

----------------------------------------------------------------------
 .../camel/component/hdfs/HdfsConfiguration.java |   2 +-
 .../camel/component/hdfs2/HdfsComponent.java    |   3 +
 .../component/hdfs2/HdfsConfiguration.java      | 112 +++++++++++++++++--
 3 files changed, 107 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/5e0c0faf/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConfiguration.java
----------------------------------------------------------------------
diff --git 
a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConfiguration.java
 
b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConfiguration.java
index 1ddcf72..0a0bc86 100644
--- 
a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConfiguration.java
+++ 
b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConfiguration.java
@@ -213,7 +213,7 @@ public class HdfsConfiguration {
     public void parseURI(URI uri) throws URISyntaxException {
         String protocol = uri.getScheme();
         if (!protocol.equalsIgnoreCase("hdfs")) {
-            throw new IllegalArgumentException("Unrecognized Cache protocol: " 
+ protocol + " for uri: " + uri);
+            throw new IllegalArgumentException("Unrecognized protocol: " + 
protocol + " for uri: " + uri);
         }
         hostName = uri.getHost();
         if (hostName == null) {

http://git-wip-us.apache.org/repos/asf/camel/blob/5e0c0faf/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsComponent.java
----------------------------------------------------------------------
diff --git 
a/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsComponent.java
 
b/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsComponent.java
index 1b05c35..b854422 100644
--- 
a/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsComponent.java
+++ 
b/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsComponent.java
@@ -67,6 +67,9 @@ public class HdfsComponent extends UriEndpointComponent {
         return auth;
     }
 
+    /**
+     * To use the given configuration for security with JAAS.
+     */
     static void setJAASConfiguration(Configuration auth) {
         if (auth != null) {
             LOG.trace("Restoring existing JAAS Configuration {}", auth);

http://git-wip-us.apache.org/repos/asf/camel/blob/5e0c0faf/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsConfiguration.java
----------------------------------------------------------------------
diff --git 
a/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsConfiguration.java
 
b/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsConfiguration.java
index 7dd5e55..c673aa2 100644
--- 
a/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsConfiguration.java
+++ 
b/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsConfiguration.java
@@ -33,18 +33,21 @@ import org.apache.hadoop.io.SequenceFile;
 public class HdfsConfiguration {
 
     private URI uri;
+    private boolean wantAppend;
+    private List<HdfsProducer.SplitStrategy> splitStrategies;
+
     @UriPath @Metadata(required = "true")
     private String hostName;
     @UriPath(defaultValue = "" + HdfsConstants.DEFAULT_PORT)
     private int port = HdfsConstants.DEFAULT_PORT;
     @UriPath @Metadata(required = "true")
     private String path;
-    @UriParam(defaultValue = "true")
+    @UriParam(label = "producer", defaultValue = "true")
     private boolean overwrite = true;
-    @UriParam
+    @UriParam(label = "producer")
     private boolean append;
     @UriParam
-    private boolean wantAppend;
+    private String splitStrategy;
     @UriParam(defaultValue = "" + HdfsConstants.DEFAULT_BUFFERSIZE)
     private int bufferSize = HdfsConstants.DEFAULT_BUFFERSIZE;
     @UriParam(defaultValue = "" + HdfsConstants.DEFAULT_REPLICATION)
@@ -67,17 +70,16 @@ public class HdfsConfiguration {
     private String openedSuffix = HdfsConstants.DEFAULT_OPENED_SUFFIX;
     @UriParam(defaultValue = HdfsConstants.DEFAULT_READ_SUFFIX)
     private String readSuffix = HdfsConstants.DEFAULT_READ_SUFFIX;
-    @UriParam
+    @UriParam(label = "consumer")
     private long initialDelay;
-    @UriParam(defaultValue = "" + HdfsConstants.DEFAULT_DELAY)
+    @UriParam(label = "consumer", defaultValue = "" + 
HdfsConstants.DEFAULT_DELAY)
     private long delay = HdfsConstants.DEFAULT_DELAY;
-    @UriParam(defaultValue = HdfsConstants.DEFAULT_PATTERN)
+    @UriParam(label = "consumer", defaultValue = HdfsConstants.DEFAULT_PATTERN)
     private String pattern = HdfsConstants.DEFAULT_PATTERN;
     @UriParam(defaultValue = "" + HdfsConstants.DEFAULT_BUFFERSIZE)
     private int chunkSize = HdfsConstants.DEFAULT_BUFFERSIZE;
     @UriParam(defaultValue = "" + HdfsConstants.DEFAULT_CHECK_IDLE_INTERVAL)
     private int checkIdleInterval = HdfsConstants.DEFAULT_CHECK_IDLE_INTERVAL;
-    private List<HdfsProducer.SplitStrategy> splitStrategies;
     @UriParam(defaultValue = "true")
     private boolean connectOnStartup = true;
     @UriParam
@@ -211,7 +213,7 @@ public class HdfsConfiguration {
     public void parseURI(URI uri) throws URISyntaxException {
         String protocol = uri.getScheme();
         if (!protocol.equalsIgnoreCase("hdfs2")) {
-            throw new IllegalArgumentException("Unrecognized Cache protocol: " 
+ protocol + " for uri: " + uri);
+            throw new IllegalArgumentException("Unrecognized protocol: " + 
protocol + " for uri: " + uri);
         }
         hostName = uri.getHost();
         if (hostName == null) {
@@ -254,6 +256,9 @@ public class HdfsConfiguration {
         return hostName;
     }
 
+    /**
+     * HDFS host to use
+     */
     public void setHostName(String hostName) {
         this.hostName = hostName;
     }
@@ -262,6 +267,9 @@ public class HdfsConfiguration {
         return port;
     }
 
+    /**
+     * HDFS port to use
+     */
     public void setPort(int port) {
         this.port = port;
     }
@@ -270,6 +278,9 @@ public class HdfsConfiguration {
         return path;
     }
 
+    /**
+     * The directory path to use
+     */
     public void setPath(String path) {
         this.path = path;
     }
@@ -278,6 +289,9 @@ public class HdfsConfiguration {
         return overwrite;
     }
 
+    /**
+     * Whether to overwrite existing files with the same name
+     */
     public void setOverwrite(boolean overwrite) {
         this.overwrite = overwrite;
     }
@@ -290,6 +304,9 @@ public class HdfsConfiguration {
         return wantAppend;
     }
 
+    /**
+     * Append to existing file. Notice that not all HDFS file systems support 
the append option.
+     */
     public void setAppend(boolean append) {
         this.append = append;
     }
@@ -298,6 +315,9 @@ public class HdfsConfiguration {
         return bufferSize;
     }
 
+    /**
+     * The buffer size used by HDFS
+     */
     public void setBufferSize(int bufferSize) {
         this.bufferSize = bufferSize;
     }
@@ -306,6 +326,9 @@ public class HdfsConfiguration {
         return replication;
     }
 
+    /**
+     * The HDFS replication factor
+     */
     public void setReplication(short replication) {
         this.replication = replication;
     }
@@ -314,6 +337,9 @@ public class HdfsConfiguration {
         return blockSize;
     }
 
+    /**
+     * The size of the HDFS blocks
+     */
     public void setBlockSize(long blockSize) {
         this.blockSize = blockSize;
     }
@@ -322,6 +348,9 @@ public class HdfsConfiguration {
         return fileType;
     }
 
+    /**
+     * The file type to use. For more details see Hadoop HDFS documentation 
about the various files types.
+     */
     public void setFileType(HdfsFileType fileType) {
         this.fileType = fileType;
     }
@@ -330,6 +359,9 @@ public class HdfsConfiguration {
         return compressionType;
     }
 
+    /**
+     * The compression type to use (is default not in use)
+     */
     public void setCompressionType(SequenceFile.CompressionType 
compressionType) {
         this.compressionType = compressionType;
     }
@@ -338,10 +370,16 @@ public class HdfsConfiguration {
         return compressionCodec;
     }
 
+    /**
+     * The compression codec to use
+     */
     public void setCompressionCodec(HdfsCompressionCodec compressionCodec) {
         this.compressionCodec = compressionCodec;
     }
 
+    /**
+     * Set to LOCAL to not use HDFS but local java.io.File instead.
+     */
     public void setFileSystemType(HdfsFileSystemType fileSystemType) {
         this.fileSystemType = fileSystemType;
     }
@@ -354,6 +392,9 @@ public class HdfsConfiguration {
         return keyType;
     }
 
+    /**
+     * The type for the key in case of sequence or map files.
+     */
     public void setKeyType(HdfsWritableFactories.WritableType keyType) {
         this.keyType = keyType;
     }
@@ -362,10 +403,16 @@ public class HdfsConfiguration {
         return valueType;
     }
 
+    /**
+     * The type for the key in case of sequence or map files
+     */
     public void setValueType(HdfsWritableFactories.WritableType valueType) {
         this.valueType = valueType;
     }
 
+    /**
+     * When a file is opened for reading/writing the file is renamed with this 
suffix to avoid to read it during the writing phase.
+     */
     public void setOpenedSuffix(String openedSuffix) {
         this.openedSuffix = openedSuffix;
     }
@@ -374,6 +421,9 @@ public class HdfsConfiguration {
         return openedSuffix;
     }
 
+    /**
+     * Once the file has been read is renamed with this suffix to avoid to 
read it again.
+     */
     public void setReadSuffix(String readSuffix) {
         this.readSuffix = readSuffix;
     }
@@ -382,6 +432,9 @@ public class HdfsConfiguration {
         return readSuffix;
     }
 
+    /**
+     * For the consumer, how much to wait (milliseconds) before to start 
scanning the directory.
+     */
     public void setInitialDelay(long initialDelay) {
         this.initialDelay = initialDelay;
     }
@@ -390,6 +443,9 @@ public class HdfsConfiguration {
         return initialDelay;
     }
 
+    /**
+     * The interval (milliseconds) between the directory scans.
+     */
     public void setDelay(long delay) {
         this.delay = delay;
     }
@@ -398,6 +454,9 @@ public class HdfsConfiguration {
         return delay;
     }
 
+    /**
+     * The pattern used for scanning the directory
+     */
     public void setPattern(String pattern) {
         this.pattern = pattern;
     }
@@ -406,6 +465,9 @@ public class HdfsConfiguration {
         return pattern;
     }
 
+    /**
+     * When reading a normal file, this is split into chunks producing a 
message per chunk.
+     */
     public void setChunkSize(int chunkSize) {
         this.chunkSize = chunkSize;
     }
@@ -414,6 +476,9 @@ public class HdfsConfiguration {
         return chunkSize;
     }
 
+    /**
+     * How often (time in millis) in to run the idle checker background task. 
This option is only in use if the splitter strategy is IDLE.
+     */
     public void setCheckIdleInterval(int checkIdleInterval) {
         this.checkIdleInterval = checkIdleInterval;
     }
@@ -426,14 +491,40 @@ public class HdfsConfiguration {
         return splitStrategies;
     }
 
+    public String getSplitStrategy() {
+        return splitStrategy;
+    }
+
+    /**
+     * In the current version of Hadoop opening a file in append mode is 
disabled since it's not very reliable. So, for the moment,
+     * it's only possible to create new files. The Camel HDFS endpoint tries 
to solve this problem in this way:
+     * <ul>
+     * <li>If the split strategy option has been defined, the hdfs path will 
be used as a directory and files will be created using the configured 
UuidGenerator.</li>
+     * <li>Every time a splitting condition is met, a new file is created.</li>
+     * </ul>
+     * The splitStrategy option is defined as a string with the following 
syntax:
+     * <br/><tt>splitStrategy=ST:value,ST:value,...</tt>
+     * <br/>where ST can be:
+     * <ul>
+     * <li>BYTES a new file is created, and the old is closed when the number 
of written bytes is more than value</li>
+     * <li>MESSAGES a new file is created, and the old is closed when the 
number of written messages is more than value</li>
+     * <li>IDLE a new file is created, and the old is closed when no writing 
happened in the last value milliseconds</li>
+     * </ul>
+     */
     public void setSplitStrategy(String splitStrategy) {
-        // noop
+        this.splitStrategy = splitStrategy;
     }
 
     public boolean isConnectOnStartup() {
         return connectOnStartup;
     }
 
+    /**
+     * Whether to connect to the HDFS file system on starting the 
producer/consumer.
+     * If false then the connection is created on-demand. Notice that HDFS may 
take up till 15 minutes to establish
+     * a connection, as it has hardcoded 45 x 20 sec redelivery. By setting 
this option to false allows your
+     * application to startup, and not block for up till 15 minutes.
+     */
     public void setConnectOnStartup(boolean connectOnStartup) {
         this.connectOnStartup = connectOnStartup;
     }
@@ -442,6 +533,9 @@ public class HdfsConfiguration {
         return owner;
     }
 
+    /**
+     * The file owner must match this owner for the consumer to pickup the 
file. Otherwise the file is skipped.
+     */
     public void setOwner(String owner) {
         this.owner = owner;
     }

Reply via email to