Component docs
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/5e0c0faf Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/5e0c0faf Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/5e0c0faf Branch: refs/heads/master Commit: 5e0c0faf3d598058b7c43639eef467f21fd0ad88 Parents: adf1df7 Author: Claus Ibsen <davscl...@apache.org> Authored: Thu May 7 12:02:37 2015 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Thu May 7 12:04:39 2015 +0200 ---------------------------------------------------------------------- .../camel/component/hdfs/HdfsConfiguration.java | 2 +- .../camel/component/hdfs2/HdfsComponent.java | 3 + .../component/hdfs2/HdfsConfiguration.java | 112 +++++++++++++++++-- 3 files changed, 107 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/5e0c0faf/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConfiguration.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConfiguration.java index 1ddcf72..0a0bc86 100644 --- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConfiguration.java +++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConfiguration.java @@ -213,7 +213,7 @@ public class HdfsConfiguration { public void parseURI(URI uri) throws URISyntaxException { String protocol = uri.getScheme(); if (!protocol.equalsIgnoreCase("hdfs")) { - throw new IllegalArgumentException("Unrecognized Cache protocol: " + protocol + " for uri: " + uri); + throw new IllegalArgumentException("Unrecognized protocol: " + protocol + " for uri: " + uri); } hostName = uri.getHost(); if (hostName == null) { http://git-wip-us.apache.org/repos/asf/camel/blob/5e0c0faf/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsComponent.java b/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsComponent.java index 1b05c35..b854422 100644 --- a/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsComponent.java +++ b/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsComponent.java @@ -67,6 +67,9 @@ public class HdfsComponent extends UriEndpointComponent { return auth; } + /** + * To use the given configuration for security with JAAS. + */ static void setJAASConfiguration(Configuration auth) { if (auth != null) { LOG.trace("Restoring existing JAAS Configuration {}", auth); http://git-wip-us.apache.org/repos/asf/camel/blob/5e0c0faf/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsConfiguration.java b/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsConfiguration.java index 7dd5e55..c673aa2 100644 --- a/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsConfiguration.java +++ b/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsConfiguration.java @@ -33,18 +33,21 @@ import org.apache.hadoop.io.SequenceFile; public class HdfsConfiguration { private URI uri; + private boolean wantAppend; + private List<HdfsProducer.SplitStrategy> splitStrategies; + @UriPath @Metadata(required = "true") private String hostName; @UriPath(defaultValue = "" + HdfsConstants.DEFAULT_PORT) private int port = HdfsConstants.DEFAULT_PORT; @UriPath @Metadata(required = "true") private String path; - @UriParam(defaultValue = "true") + @UriParam(label = "producer", defaultValue = "true") private boolean overwrite = true; - @UriParam + @UriParam(label = "producer") private boolean append; @UriParam - private boolean wantAppend; + private String splitStrategy; @UriParam(defaultValue = "" + HdfsConstants.DEFAULT_BUFFERSIZE) private int bufferSize = HdfsConstants.DEFAULT_BUFFERSIZE; @UriParam(defaultValue = "" + HdfsConstants.DEFAULT_REPLICATION) @@ -67,17 +70,16 @@ public class HdfsConfiguration { private String openedSuffix = HdfsConstants.DEFAULT_OPENED_SUFFIX; @UriParam(defaultValue = HdfsConstants.DEFAULT_READ_SUFFIX) private String readSuffix = HdfsConstants.DEFAULT_READ_SUFFIX; - @UriParam + @UriParam(label = "consumer") private long initialDelay; - @UriParam(defaultValue = "" + HdfsConstants.DEFAULT_DELAY) + @UriParam(label = "consumer", defaultValue = "" + HdfsConstants.DEFAULT_DELAY) private long delay = HdfsConstants.DEFAULT_DELAY; - @UriParam(defaultValue = HdfsConstants.DEFAULT_PATTERN) + @UriParam(label = "consumer", defaultValue = HdfsConstants.DEFAULT_PATTERN) private String pattern = HdfsConstants.DEFAULT_PATTERN; @UriParam(defaultValue = "" + HdfsConstants.DEFAULT_BUFFERSIZE) private int chunkSize = HdfsConstants.DEFAULT_BUFFERSIZE; @UriParam(defaultValue = "" + HdfsConstants.DEFAULT_CHECK_IDLE_INTERVAL) private int checkIdleInterval = HdfsConstants.DEFAULT_CHECK_IDLE_INTERVAL; - private List<HdfsProducer.SplitStrategy> splitStrategies; @UriParam(defaultValue = "true") private boolean connectOnStartup = true; @UriParam @@ -211,7 +213,7 @@ public class HdfsConfiguration { public void parseURI(URI uri) throws URISyntaxException { String protocol = uri.getScheme(); if (!protocol.equalsIgnoreCase("hdfs2")) { - throw new IllegalArgumentException("Unrecognized Cache protocol: " + protocol + " for uri: " + uri); + throw new IllegalArgumentException("Unrecognized protocol: " + protocol + " for uri: " + uri); } hostName = uri.getHost(); if (hostName == null) { @@ -254,6 +256,9 @@ public class HdfsConfiguration { return hostName; } + /** + * HDFS host to use + */ public void setHostName(String hostName) { this.hostName = hostName; } @@ -262,6 +267,9 @@ public class HdfsConfiguration { return port; } + /** + * HDFS port to use + */ public void setPort(int port) { this.port = port; } @@ -270,6 +278,9 @@ public class HdfsConfiguration { return path; } + /** + * The directory path to use + */ public void setPath(String path) { this.path = path; } @@ -278,6 +289,9 @@ public class HdfsConfiguration { return overwrite; } + /** + * Whether to overwrite existing files with the same name + */ public void setOverwrite(boolean overwrite) { this.overwrite = overwrite; } @@ -290,6 +304,9 @@ public class HdfsConfiguration { return wantAppend; } + /** + * Append to existing file. Notice that not all HDFS file systems support the append option. + */ public void setAppend(boolean append) { this.append = append; } @@ -298,6 +315,9 @@ public class HdfsConfiguration { return bufferSize; } + /** + * The buffer size used by HDFS + */ public void setBufferSize(int bufferSize) { this.bufferSize = bufferSize; } @@ -306,6 +326,9 @@ public class HdfsConfiguration { return replication; } + /** + * The HDFS replication factor + */ public void setReplication(short replication) { this.replication = replication; } @@ -314,6 +337,9 @@ public class HdfsConfiguration { return blockSize; } + /** + * The size of the HDFS blocks + */ public void setBlockSize(long blockSize) { this.blockSize = blockSize; } @@ -322,6 +348,9 @@ public class HdfsConfiguration { return fileType; } + /** + * The file type to use. For more details see Hadoop HDFS documentation about the various files types. + */ public void setFileType(HdfsFileType fileType) { this.fileType = fileType; } @@ -330,6 +359,9 @@ public class HdfsConfiguration { return compressionType; } + /** + * The compression type to use (is default not in use) + */ public void setCompressionType(SequenceFile.CompressionType compressionType) { this.compressionType = compressionType; } @@ -338,10 +370,16 @@ public class HdfsConfiguration { return compressionCodec; } + /** + * The compression codec to use + */ public void setCompressionCodec(HdfsCompressionCodec compressionCodec) { this.compressionCodec = compressionCodec; } + /** + * Set to LOCAL to not use HDFS but local java.io.File instead. + */ public void setFileSystemType(HdfsFileSystemType fileSystemType) { this.fileSystemType = fileSystemType; } @@ -354,6 +392,9 @@ public class HdfsConfiguration { return keyType; } + /** + * The type for the key in case of sequence or map files. + */ public void setKeyType(HdfsWritableFactories.WritableType keyType) { this.keyType = keyType; } @@ -362,10 +403,16 @@ public class HdfsConfiguration { return valueType; } + /** + * The type for the key in case of sequence or map files + */ public void setValueType(HdfsWritableFactories.WritableType valueType) { this.valueType = valueType; } + /** + * When a file is opened for reading/writing the file is renamed with this suffix to avoid to read it during the writing phase. + */ public void setOpenedSuffix(String openedSuffix) { this.openedSuffix = openedSuffix; } @@ -374,6 +421,9 @@ public class HdfsConfiguration { return openedSuffix; } + /** + * Once the file has been read is renamed with this suffix to avoid to read it again. + */ public void setReadSuffix(String readSuffix) { this.readSuffix = readSuffix; } @@ -382,6 +432,9 @@ public class HdfsConfiguration { return readSuffix; } + /** + * For the consumer, how much to wait (milliseconds) before to start scanning the directory. + */ public void setInitialDelay(long initialDelay) { this.initialDelay = initialDelay; } @@ -390,6 +443,9 @@ public class HdfsConfiguration { return initialDelay; } + /** + * The interval (milliseconds) between the directory scans. + */ public void setDelay(long delay) { this.delay = delay; } @@ -398,6 +454,9 @@ public class HdfsConfiguration { return delay; } + /** + * The pattern used for scanning the directory + */ public void setPattern(String pattern) { this.pattern = pattern; } @@ -406,6 +465,9 @@ public class HdfsConfiguration { return pattern; } + /** + * When reading a normal file, this is split into chunks producing a message per chunk. + */ public void setChunkSize(int chunkSize) { this.chunkSize = chunkSize; } @@ -414,6 +476,9 @@ public class HdfsConfiguration { return chunkSize; } + /** + * How often (time in millis) in to run the idle checker background task. This option is only in use if the splitter strategy is IDLE. + */ public void setCheckIdleInterval(int checkIdleInterval) { this.checkIdleInterval = checkIdleInterval; } @@ -426,14 +491,40 @@ public class HdfsConfiguration { return splitStrategies; } + public String getSplitStrategy() { + return splitStrategy; + } + + /** + * In the current version of Hadoop opening a file in append mode is disabled since it's not very reliable. So, for the moment, + * it's only possible to create new files. The Camel HDFS endpoint tries to solve this problem in this way: + * <ul> + * <li>If the split strategy option has been defined, the hdfs path will be used as a directory and files will be created using the configured UuidGenerator.</li> + * <li>Every time a splitting condition is met, a new file is created.</li> + * </ul> + * The splitStrategy option is defined as a string with the following syntax: + * <br/><tt>splitStrategy=ST:value,ST:value,...</tt> + * <br/>where ST can be: + * <ul> + * <li>BYTES a new file is created, and the old is closed when the number of written bytes is more than value</li> + * <li>MESSAGES a new file is created, and the old is closed when the number of written messages is more than value</li> + * <li>IDLE a new file is created, and the old is closed when no writing happened in the last value milliseconds</li> + * </ul> + */ public void setSplitStrategy(String splitStrategy) { - // noop + this.splitStrategy = splitStrategy; } public boolean isConnectOnStartup() { return connectOnStartup; } + /** + * Whether to connect to the HDFS file system on starting the producer/consumer. + * If false then the connection is created on-demand. Notice that HDFS may take up till 15 minutes to establish + * a connection, as it has hardcoded 45 x 20 sec redelivery. By setting this option to false allows your + * application to startup, and not block for up till 15 minutes. + */ public void setConnectOnStartup(boolean connectOnStartup) { this.connectOnStartup = connectOnStartup; } @@ -442,6 +533,9 @@ public class HdfsConfiguration { return owner; } + /** + * The file owner must match this owner for the consumer to pickup the file. Otherwise the file is skipped. + */ public void setOwner(String owner) { this.owner = owner; }