Repository: samza Updated Branches: refs/heads/master 85830be9c -> c5348bf6b
SAMZA-2041: add hdfs and kinesis descriptor Author: Hai Lu <[email protected]> Reviewers: Xinyu Liu <[email protected]> Closes #857 from lhaiesp/master Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/c5348bf6 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/c5348bf6 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/c5348bf6 Branch: refs/heads/master Commit: c5348bf6b1fcaf7f9db841b815b6b5eeb9937395 Parents: 85830be Author: Hai Lu <[email protected]> Authored: Fri Dec 14 14:35:45 2018 -0800 Committer: Hai Lu <[email protected]> Committed: Fri Dec 14 14:35:45 2018 -0800 ---------------------------------------------------------------------- .../documentation/versioned/connectors/hdfs.md | 48 ++-- .../versioned/connectors/kinesis.md | 102 +++++--- .../samza/system/kinesis/KinesisConfig.java | 22 +- .../descriptors/KinesisInputDescriptor.java | 123 +++++++++ .../descriptors/KinesisSystemDescriptor.java | 139 ++++++++++ .../descriptors/TestKinesisInputDescriptor.java | 58 +++++ .../TestKinesisSystemDescriptor.java | 57 +++++ .../hdfs/descriptors/HdfsInputDescriptor.java | 48 ++++ .../hdfs/descriptors/HdfsOutputDescriptor.java | 46 ++++ .../hdfs/descriptors/HdfsSystemDescriptor.java | 255 +++++++++++++++++++ .../descriptors/TestHdfsSystemDescriptor.java | 55 ++++ 11 files changed, 883 insertions(+), 70 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/c5348bf6/docs/learn/documentation/versioned/connectors/hdfs.md ---------------------------------------------------------------------- diff --git a/docs/learn/documentation/versioned/connectors/hdfs.md b/docs/learn/documentation/versioned/connectors/hdfs.md index ece7bbf..822be7e 100644 --- a/docs/learn/documentation/versioned/connectors/hdfs.md +++ b/docs/learn/documentation/versioned/connectors/hdfs.md @@ -47,14 +47,11 @@ While streaming sources like Kafka are unbounded, files on HDFS have finite data #### Defining streams -Samza uses the notion of a _system_ to describe any I/O source it interacts with. To consume from HDFS, you should create a new system that points to - `HdfsSystemFactory`. You can then associate multiple streams with this _system_. Each stream should have a _physical name_, which should be set to the name of the directory on HDFS. - -{% highlight jproperties %} -systems.hdfs.samza.factory=org.apache.samza.system.hdfs.HdfsSystemFactory - -streams.hdfs-clickstream.samza.system=hdfs -streams.hdfs-clickstream.samza.physical.name=hdfs:/data/clickstream/2016/09/11 +In Samza high level API, you can use `HdfsSystemDescriptor` to create a HDFS system. The stream name should be set to the name of the directory on HDFS. +{% highlight java %} +HdfsSystemDescriptor hsd = new HdfsSystemDescriptor("hdfs-clickstream"); +HdfsInputDescriptor hid = hsd.getInputDescriptor("/data/clickstream/2016/09/11"); {% endhighlight %} The above example defines a stream called `hdfs-clickstream` that reads data from the `/data/clickstream/2016/09/11` directory. @@ -62,9 +59,10 @@ The above example defines a stream called `hdfs-clickstream` that reads data fro #### Whitelists & Blacklists If you only want to consume from files that match a certain pattern, you can configure a whitelist. Likewise, you can also blacklist consuming from certain files. When both are specified, the _whitelist_ selects the files to be filtered and the _blacklist_ is later applied on its results. -{% highlight jproperties %} -systems.hdfs.partitioner.defaultPartitioner.whitelist=.*avro -systems.hdfs.partitioner.defaultPartitioner.blacklist=somefile.avro +{% highlight java %} +HdfsSystemDescriptor hsd = new HdfsSystemDescriptor("hdfs-clickstream") + .withConsumerWhiteList(".*avro") + .withConsumerBlackList("somefile.avro"); {% endhighlight %} @@ -74,34 +72,34 @@ systems.hdfs.partitioner.defaultPartitioner.blacklist=somefile.avro Samza allows writing your output results to HDFS in AVRO format. You can either use avro's GenericRecords or have Samza automatically infer the schema for your object using reflection. -{% highlight jproperties %} -# set the SystemFactory implementation to instantiate HdfsSystemProducer aliased to 'hdfs' -systems.hdfs.samza.factory=org.apache.samza.system.hdfs.HdfsSystemFactory -systems.hdfs.producer.hdfs.writer.class=org.apache.samza.system.hdfs.writer.AvroDataFileHdfsWriter +{% highlight java %} +HdfsSystemDescriptor hsd = new HdfsSystemDescriptor("hdfs-clickstream") + .withWriterClassName(AvroDataFileHdfsWriter.class.getName()); {% endhighlight %} -If your output is non-avro, you can describe its format by implementing your own serializer. -{% highlight jproperties %} -systems.hdfs.producer.hdfs.writer.class=org.apache.samza.system.hdfs.writer.TextSequenceFileHdfsWriter -serializers.registry.my-serde-name.class=MySerdeFactory -systems.hdfs.samza.msg.serde=my-serde-name +If your output is non-avro, use `TextSequenceFileHdfsWriter`. +{% highlight java %} +HdfsSystemDescriptor hsd = new HdfsSystemDescriptor("hdfs-clickstream") + .withWriterClassName(TextSequenceFileHdfsWriter.class.getName()); {% endhighlight %} #### Output directory structure Samza allows you to control the base HDFS directory to write your output. You can also organize the output into sub-directories depending on the time your application ran, by configuring a date-formatter. -{% highlight jproperties %} -systems.hdfs.producer.hdfs.base.output.dir=/user/me/analytics/clickstream_data -systems.hdfs.producer.hdfs.bucketer.date.path.format=yyyy_MM_dd +{% highlight java %} +HdfsSystemDescriptor hsd = new HdfsSystemDescriptor("hdfs-clickstream") + .withOutputBaseDir("/user/me/analytics/clickstream_data") + .withDatePathFormat("yyyy_MM_dd"); {% endhighlight %} You can configure the maximum size of each file or the maximum number of records per-file. Once either limits have been reached, Samza will create a new file. -{% highlight jproperties %} -systems.hdfs.producer.hdfs.write.batch.size.bytes=134217728 -systems.hdfs.producer.hdfs.write.batch.size.records=10000 +{% highlight java %} +HdfsSystemDescriptor hsd = new HdfsSystemDescriptor("hdfs-clickstream") + .withWriteBatchSizeBytes(134217728) + .withWriteBatchSizeRecords(10000); {% endhighlight %} ### Security http://git-wip-us.apache.org/repos/asf/samza/blob/c5348bf6/docs/learn/documentation/versioned/connectors/kinesis.md ---------------------------------------------------------------------- diff --git a/docs/learn/documentation/versioned/connectors/kinesis.md b/docs/learn/documentation/versioned/connectors/kinesis.md index e319e92..57dae9c 100644 --- a/docs/learn/documentation/versioned/connectors/kinesis.md +++ b/docs/learn/documentation/versioned/connectors/kinesis.md @@ -36,22 +36,16 @@ wraps the Record into a [KinesisIncomingMessageEnvelope](https://github.com/apac #### Basic Configuration -Here is the required configuration for consuming messages from Kinesis. - -{% highlight jproperties %} -// Define a Kinesis system factory with your identifier. eg: kinesis-system -systems.kinesis-system.samza.factory=org.apache.samza.system.kinesis.KinesisSystemFactory - -// Kinesis consumer works with only AllSspToSingleTaskGrouperFactory -job.systemstreampartition.grouper.factory=org.apache.samza.container.grouper.stream.AllSspToSingleTaskGrouperFactory - -// Define your streams -task.inputs=kinesis-system.input0 - -// Define required properties for your streams -systems.kinesis-system.streams.input0.aws.region=YOUR-STREAM-REGION -systems.kinesis-system.streams.input0.aws.accessKey=YOUR-ACCESS_KEY -sensitive.systems.kinesis-system.streams.input0.aws.secretKey=YOUR-SECRET-KEY +Here is the required configuration for consuming messages from Kinesis, through `KinesisSystemDescriptor` and `KinesisInputDescriptor`. + +{% highlight java %} +KinesisSystemDescriptor ksd = new KinesisSystemDescriptor("kinesis"); + +KinesisInputDescriptor<KV<String, byte[]>> kid = + ksd.getInputDescriptor("STREAM-NAME", new NoOpSerde<byte[]>()) + .withRegion("STREAM-REGION") + .withAccessKey("YOUR-ACCESS_KEY") + .withSecretKey("YOUR-SECRET-KEY"); {% endhighlight %} ####Coordination @@ -66,10 +60,12 @@ job.systemstreampartition.grouper.factory=org.apache.samza.container.grouper.str Each Kinesis stream in a given AWS [region](https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/Concepts.RegionsAndAvailabilityZones.html) can be accessed by providing an [access key](https://docs.aws.amazon.com/general/latest/gr/aws-sec-cred-types.html#access-keys-and-secret-access-keys). An Access key consists of two parts: an access key ID (for example, `AKIAIOSFODNN7EXAMPLE`) and a secret access key (for example, `wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY`) which you can use to send programmatic requests to AWS. -{% highlight jproperties %} -systems.kinesis-system.streams.input0.aws.region=YOUR-STREAM-REGION -systems.kinesis-system.streams.input0.aws.accessKey=YOUR-ACCESS_KEY -sensitive.systems.kinesis-system.streams.input0.aws.secretKey=YOUR-SECRET-KEY +{% highlight java %} +KinesisInputDescriptor<KV<String, byte[]>> kid = + ksd.getInputDescriptor("STREAM-NAME", new NoOpSerde<byte[]>()) + .withRegion("STREAM-REGION") + .withAccessKey("YOUR-ACCESS_KEY") + .withSecretKey("YOUR-SECRET-KEY"); {% endhighlight %} ### Advanced Configuration @@ -77,29 +73,44 @@ sensitive.systems.kinesis-system.streams.input0.aws.secretKey=YOUR-SECRET-KEY #### Kinesis Client Library Configs Samza Kinesis Connector uses the [Kinesis Client Library](https://docs.aws.amazon.com/streams/latest/dev/developing-consumers-with-kcl.html#kinesis-record-processor-overview-kcl) (KCL) to access the Kinesis data streams. You can set any [KCL Configuration](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/coordinator/KinesisClientLibConfiguration.java) -for a stream by configuring it with the **systems.system-name.streams.stream-name.aws.kcl.*** prefix. +for a stream by configuring it through `KinesisInputDescriptor`. -{% highlight jproperties %} -systems.system-name.streams.stream-name.aws.kcl.CONFIG-PARAM=CONFIG-VALUE +{% highlight java %} +KinesisInputDescriptor<KV<String, byte[]>> kid = ... + +Map<String, String> kclConfig = new HashMap<>; +kclConfig.put("CONFIG-PARAM", "CONFIG-VALUE"); + +kid.withKCLConfig(kclConfig); {% endhighlight %} As an example, the below configuration is equivalent to invoking `kclClient#WithTableName(myTable)` on the KCL instance. -{% highlight jproperties %} -systems.system-name.streams.stream-name.aws.kcl.TableName=myTable +{% highlight java %} +KinesisInputDescriptor<KV<String, byte[]>> kid = ... + +Map<String, String> kclConfig = new HashMap<>; +kclConfig.put("TableName", "myTable"); + +kid.withKCLConfig(kclConfig); {% endhighlight %} #### AWS Client configs Samza allows you to specify any [AWS client configs](http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/ClientConfiguration.html) to connect to your Kinesis instance. -You can configure any [AWS client configuration](http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/ClientConfiguration.html) with the `systems.your-system-name.aws.clientConfig.*` prefix. +You can configure any [AWS client configuration](http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/ClientConfiguration.html) through `KinesisSystemDescriptor`. -{% highlight jproperties %} -systems.system-name.aws.clientConfig.CONFIG-PARAM=CONFIG-VALUE +{% highlight java %} +Map<String, String> awsConfig = new HashMap<>; +awsConfig.put("CONFIG-PARAM", "CONFIG-VALUE"); + +KinesisSystemDescriptor sd = new KinesisSystemDescriptor(systemName) + .withAWSConfig(awsConfig); {% endhighlight %} -As an example, to set the *proxy host* and *proxy port* to be used by the Kinesis Client: -{% highlight jproperties %} -systems.system-name.aws.clientConfig.ProxyHost=my-proxy-host.com -systems.system-name.aws.clientConfig.ProxyPort=my-proxy-port +Through `KinesisSystemDescriptor` you can also set the *proxy host* and *proxy port* to be used by the Kinesis Client: +{% highlight java %} +KinesisSystemDescriptor sd = new KinesisSystemDescriptor(systemName) + .withProxyHost("YOUR-PROXY-HOST") + .withProxyPort(YOUR-PROXY-PORT); {% endhighlight %} ### Resetting Offsets @@ -109,14 +120,37 @@ These checkpoints are stored and managed by the KCL library internally. You can {% highlight jproperties %} // change the TableName to a unique name to reset checkpoints. -systems.kinesis-system.streams.input0.aws.kcl.TableName=my-app-table-name +systems.kinesis-system.streams.STREAM-NAME.aws.kcl.TableName=my-app-table-name +{% endhighlight %} + +Or through `KinesisInputDescriptor` + +{% highlight java %} +KinesisInputDescriptor<KV<String, byte[]>> kid = ... + +Map<String, String> kclConfig = new HashMap<>; +kclConfig.put("TableName", "my-new-app-table-name"); + +kid.withKCLConfig(kclConfig); {% endhighlight %} + When you reset checkpoints, you can configure your job to start consuming from either the earliest or latest offset in the stream. {% highlight jproperties %} // set the starting position to either TRIM_HORIZON (oldest) or LATEST (latest) -systems.kinesis-system.streams.input0.aws.kcl.InitialPositionInStream=LATEST +systems.kinesis-system.streams.STREAM-NAME.aws.kcl.InitialPositionInStream=LATEST +{% endhighlight %} + +Or through `KinesisInputDescriptor` + +{% highlight java %} +KinesisInputDescriptor<KV<String, byte[]>> kid = ... + +Map<String, String> kclConfig = new HashMap<>; +kclConfig.put("InitialPositionInStream", "LATEST"); + +kid.withKCLConfig(kclConfig); {% endhighlight %} Alternately, if you want to start from a particular offset in the Kinesis stream, you can login to the [AWS console](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/ConsoleDynamoDB.html) and edit the offsets in your DynamoDB Table. http://git-wip-us.apache.org/repos/asf/samza/blob/c5348bf6/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisConfig.java ---------------------------------------------------------------------- diff --git a/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisConfig.java b/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisConfig.java index d11096f..e0c9099 100644 --- a/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisConfig.java +++ b/samza-aws/src/main/java/org/apache/samza/system/kinesis/KinesisConfig.java @@ -54,20 +54,20 @@ import com.amazonaws.ClientConfiguration; public class KinesisConfig extends MapConfig { private static final Logger LOG = LoggerFactory.getLogger(KinesisConfig.class.getName()); - private static final String CONFIG_SYSTEM_REGION = "systems.%s.aws.region"; - private static final String CONFIG_STREAM_REGION = "systems.%s.streams.%s.aws.region"; + public static final String CONFIG_SYSTEM_REGION = "systems.%s.aws.region"; + public static final String CONFIG_STREAM_REGION = "systems.%s.streams.%s.aws.region"; - private static final String CONFIG_STREAM_ACCESS_KEY = "systems.%s.streams.%s.aws.accessKey"; - private static final String CONFIG_STREAM_SECRET_KEY = "sensitive.systems.%s.streams.%s.aws.secretKey"; + public static final String CONFIG_STREAM_ACCESS_KEY = "systems.%s.streams.%s.aws.accessKey"; + public static final String CONFIG_STREAM_SECRET_KEY = "sensitive.systems.%s.streams.%s.aws.secretKey"; - private static final String CONFIG_AWS_CLIENT_CONFIG = "systems.%s.aws.clientConfig."; - private static final String CONFIG_PROXY_HOST = CONFIG_AWS_CLIENT_CONFIG + "ProxyHost"; - private static final String DEFAULT_CONFIG_PROXY_HOST = ""; - private static final String CONFIG_PROXY_PORT = CONFIG_AWS_CLIENT_CONFIG + "ProxyPort"; - private static final int DEFAULT_CONFIG_PROXY_PORT = 0; + public static final String CONFIG_AWS_CLIENT_CONFIG = "systems.%s.aws.clientConfig."; + public static final String CONFIG_PROXY_HOST = CONFIG_AWS_CLIENT_CONFIG + "ProxyHost"; + public static final String DEFAULT_CONFIG_PROXY_HOST = ""; + public static final String CONFIG_PROXY_PORT = CONFIG_AWS_CLIENT_CONFIG + "ProxyPort"; + public static final int DEFAULT_CONFIG_PROXY_PORT = 0; - private static final String CONFIG_SYSTEM_KINESIS_CLIENT_LIB_CONFIG = "systems.%s.aws.kcl."; - private static final String CONFIG_STREAM_KINESIS_CLIENT_LIB_CONFIG = "systems.%s.streams.%s.aws.kcl."; + public static final String CONFIG_SYSTEM_KINESIS_CLIENT_LIB_CONFIG = "systems.%s.aws.kcl."; + public static final String CONFIG_STREAM_KINESIS_CLIENT_LIB_CONFIG = "systems.%s.streams.%s.aws.kcl."; public KinesisConfig(Config config) { super(config); http://git-wip-us.apache.org/repos/asf/samza/blob/c5348bf6/samza-aws/src/main/java/org/apache/samza/system/kinesis/descriptors/KinesisInputDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-aws/src/main/java/org/apache/samza/system/kinesis/descriptors/KinesisInputDescriptor.java b/samza-aws/src/main/java/org/apache/samza/system/kinesis/descriptors/KinesisInputDescriptor.java new file mode 100644 index 0000000..1c2e0a2 --- /dev/null +++ b/samza-aws/src/main/java/org/apache/samza/system/kinesis/descriptors/KinesisInputDescriptor.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.system.kinesis.descriptors; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +import org.apache.commons.lang3.StringUtils; +import org.apache.samza.serializers.KVSerde; +import org.apache.samza.serializers.NoOpSerde; +import org.apache.samza.serializers.Serde; +import org.apache.samza.system.descriptors.InputDescriptor; +import org.apache.samza.system.descriptors.SystemDescriptor; +import org.apache.samza.system.kinesis.KinesisConfig; + + +/** + * A {@link KinesisInputDescriptor} can be used for specifying Samza and Kinesis specific properties of Kinesis + * input streams. + * <p> + * Use {@link KinesisSystemDescriptor#getInputDescriptor} to obtain an instance of this descriptor. + * <p> + * Stream properties provided in configuration override corresponding properties specified using a descriptor. + * + * @param <StreamMessageType> type of messages in this stream + */ +public class KinesisInputDescriptor<StreamMessageType> + extends InputDescriptor<StreamMessageType, KinesisInputDescriptor<StreamMessageType>> { + private Optional<String> accessKey = Optional.empty(); + private Optional<String> secretKey = Optional.empty(); + private Optional<String> region = Optional.empty(); + private Map<String, String> kclConfig = Collections.emptyMap(); + + + /** + * Constructs an {@link InputDescriptor} instance. + * + * @param streamId id of the stream + * @param valueSerde serde the values in the messages in the stream + * @param systemDescriptor system descriptor this stream descriptor was obtained from + */ + <T> KinesisInputDescriptor(String streamId, Serde<T> valueSerde, SystemDescriptor systemDescriptor) { + super(streamId, KVSerde.of(new NoOpSerde<>(), valueSerde), systemDescriptor, null); + } + + /** + * Kinesis region for the system stream. + * @param region Kinesis region + * @return this input descriptor + */ + public KinesisInputDescriptor<StreamMessageType> withRegion(String region) { + this.region = Optional.of(StringUtils.stripToNull(region)); + return this; + } + + /** + * Kinesis access key name for the system stream. + * @param accessKey Kinesis access key name + * @return this input descriptor + */ + public KinesisInputDescriptor<StreamMessageType> withAccessKey(String accessKey) { + this.accessKey = Optional.of(StringUtils.stripToNull(accessKey)); + return this; + } + + /** + * Kinesis secret key name for the system stream. + * @param secretKey Kinesis secret key + * @return this input descriptor + */ + public KinesisInputDescriptor<StreamMessageType> withSecretKey(String secretKey) { + this.secretKey = Optional.of(StringUtils.stripToNull(secretKey)); + return this; + } + + /** + * KCL (Kinesis Client Library) config for the system stream. This is not required by default. + * @param kclConfig A map of specified KCL configs + * @return this input descriptor + */ + public KinesisInputDescriptor<StreamMessageType> withKCLConfig(Map<String, String> kclConfig) { + this.kclConfig = kclConfig; + return this; + } + + @Override + public Map<String, String> toConfig() { + Map<String, String> config = new HashMap<>(super.toConfig()); + + String systemName = getSystemName(); + String streamId = getStreamId(); + String clientConfigPrefix = + String.format(KinesisConfig.CONFIG_STREAM_KINESIS_CLIENT_LIB_CONFIG, systemName, streamId); + + this.region.ifPresent( + val -> config.put(String.format(KinesisConfig.CONFIG_STREAM_REGION, systemName, streamId), val)); + this.accessKey.ifPresent( + val -> config.put(String.format(KinesisConfig.CONFIG_STREAM_ACCESS_KEY, systemName, streamId), val)); + this.secretKey.ifPresent( + val -> config.put(String.format(KinesisConfig.CONFIG_STREAM_SECRET_KEY, systemName, streamId), val)); + this.kclConfig.forEach((k, v) -> config.put(clientConfigPrefix + k, v)); + + return config; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/c5348bf6/samza-aws/src/main/java/org/apache/samza/system/kinesis/descriptors/KinesisSystemDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-aws/src/main/java/org/apache/samza/system/kinesis/descriptors/KinesisSystemDescriptor.java b/samza-aws/src/main/java/org/apache/samza/system/kinesis/descriptors/KinesisSystemDescriptor.java new file mode 100644 index 0000000..ffeb667 --- /dev/null +++ b/samza-aws/src/main/java/org/apache/samza/system/kinesis/descriptors/KinesisSystemDescriptor.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.system.kinesis.descriptors; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +import org.apache.commons.lang3.StringUtils; +import org.apache.samza.operators.KV; +import org.apache.samza.serializers.Serde; +import org.apache.samza.system.descriptors.SystemDescriptor; +import org.apache.samza.system.kinesis.KinesisConfig; +import org.apache.samza.system.kinesis.KinesisSystemFactory; + + +/** + * A {@link KinesisSystemDescriptor} can be used for specifying Samza and Kinesis-specific properties of a Kinesis + * input system. It can also be used for obtaining {@link KinesisInputDescriptor}s, + * which can be used for specifying Samza and system-specific properties of Kinesis input streams. + * <p> + * System properties provided in configuration override corresponding properties specified using a descriptor. + */ +public class KinesisSystemDescriptor extends SystemDescriptor<KinesisSystemDescriptor> { + private static final String FACTORY_CLASS_NAME = KinesisSystemFactory.class.getName(); + + private Optional<String> region = Optional.empty(); + private Optional<String> proxyHost = Optional.empty(); + private Optional<Integer> proxyPort = Optional.empty(); + private Map<String, String> awsConfig = Collections.emptyMap(); + private Map<String, String> kclConfig = Collections.emptyMap(); + + public KinesisSystemDescriptor(String systemName) { + super(systemName, FACTORY_CLASS_NAME, null, null); + } + + /** + * Gets an {@link KinesisInputDescriptor} for the input stream of this system. + * <p> + * The message in the stream will have {@link String} keys and {@code ValueType} values. + * + * @param streamId id of the input stream + * @param valueSerde stream level serde for the values in the messages in the input stream + * @param <ValueType> type of the value in the messages in this stream + * @return an {@link KinesisInputDescriptor} for the Kinesis input stream + */ + public <ValueType> KinesisInputDescriptor<KV<String, ValueType>> getInputDescriptor(String streamId, + Serde<ValueType> valueSerde) { + return new KinesisInputDescriptor<>(streamId, valueSerde, this); + } + + /** + * Kinesis region for this system. + * @param region Kinesis region + * @return this system descriptor + */ + public KinesisSystemDescriptor withRegion(String region) { + this.region = Optional.of(StringUtils.stripToNull(region)); + return this; + } + + /** + * AWS config for this system. This is not required by default. + * @param awsConfig A map of specified AWS configs + * @return this system descriptor + */ + public KinesisSystemDescriptor withAWSConfig(Map<String, String> awsConfig) { + this.awsConfig = awsConfig; + return this; + } + + /** + * KCL (Kinesis Client Library) config for this system. This is not required by default. + * @param kclConfig A map of specified KCL configs + * @return this system descriptor + */ + public KinesisSystemDescriptor withKCLConfig(Map<String, String> kclConfig) { + this.kclConfig = kclConfig; + return this; + } + + /** + * Proxy host to be used for this system. + * @param proxyHost Proxy host + * @return this system descriptor + */ + public KinesisSystemDescriptor withProxyHost(String proxyHost) { + this.proxyHost = Optional.of(StringUtils.stripToNull(proxyHost)); + return this; + } + + /** + * Proxy port to be used for this system. + * @param proxyPort Proxy port + * @return this system descriptor + */ + public KinesisSystemDescriptor withProxyPort(int proxyPort) { + this.proxyPort = Optional.of(proxyPort); + return this; + } + + @Override + public Map<String, String> toConfig() { + Map<String, String> config = new HashMap<>(super.toConfig()); + String systemName = getSystemName(); + + this.region.ifPresent( + val -> config.put(String.format(KinesisConfig.CONFIG_SYSTEM_REGION, systemName), val)); + this.proxyHost.ifPresent( + val -> config.put(String.format(KinesisConfig.CONFIG_PROXY_HOST, systemName), val)); + this.proxyPort.ifPresent( + val -> config.put(String.format(KinesisConfig.CONFIG_PROXY_PORT, systemName), String.valueOf(val))); + + final String kclConfigPrefix = String.format(KinesisConfig.CONFIG_SYSTEM_KINESIS_CLIENT_LIB_CONFIG, systemName); + this.kclConfig.forEach((k, v) -> config.put(kclConfigPrefix + k, v)); + + final String awsConfigPrefix = String.format(KinesisConfig.CONFIG_AWS_CLIENT_CONFIG, systemName); + this.awsConfig.forEach((k, v) -> config.put(awsConfigPrefix + k, v)); + + return config; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/c5348bf6/samza-aws/src/test/java/org/apache/samza/system/kinesis/descriptors/TestKinesisInputDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-aws/src/test/java/org/apache/samza/system/kinesis/descriptors/TestKinesisInputDescriptor.java b/samza-aws/src/test/java/org/apache/samza/system/kinesis/descriptors/TestKinesisInputDescriptor.java new file mode 100644 index 0000000..f0a8a3b --- /dev/null +++ b/samza-aws/src/test/java/org/apache/samza/system/kinesis/descriptors/TestKinesisInputDescriptor.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.system.kinesis.descriptors; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.samza.operators.KV; +import org.apache.samza.serializers.NoOpSerde; +import org.apache.samza.system.kinesis.KinesisConfig; +import org.junit.Assert; +import org.junit.Test; + + +public class TestKinesisInputDescriptor { + @Test + public void testConfigGeneration() { + String systemName = "kinesis"; + String streamName = "Seine"; + KinesisSystemDescriptor sd = new KinesisSystemDescriptor(systemName); + Map<String, String> cliConfig = new HashMap<>(); + cliConfig.put("key1", "value1"); + KinesisInputDescriptor<KV<String, byte[]>> id = sd.getInputDescriptor(streamName, new NoOpSerde<byte[]>()) + .withRegion("Paris") + .withAccessKey("accessKey") + .withSecretKey("secretKey") + .withKCLConfig(cliConfig); + + Map<String, String> generatedConfig = id.toConfig(); + Assert.assertEquals(5, generatedConfig.size()); + + Assert.assertEquals(systemName, generatedConfig.get("streams.Seine.samza.system")); + Assert.assertEquals("Paris", + generatedConfig.get(String.format(KinesisConfig.CONFIG_STREAM_REGION, systemName, streamName))); + Assert.assertEquals("accessKey", + generatedConfig.get(String.format(KinesisConfig.CONFIG_STREAM_ACCESS_KEY, systemName, streamName))); + Assert.assertEquals("secretKey", + generatedConfig.get(String.format(KinesisConfig.CONFIG_STREAM_SECRET_KEY, systemName, streamName))); + Assert.assertEquals("value1", generatedConfig.get( + String.format(KinesisConfig.CONFIG_STREAM_KINESIS_CLIENT_LIB_CONFIG, systemName, streamName) + "key1")); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/c5348bf6/samza-aws/src/test/java/org/apache/samza/system/kinesis/descriptors/TestKinesisSystemDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-aws/src/test/java/org/apache/samza/system/kinesis/descriptors/TestKinesisSystemDescriptor.java b/samza-aws/src/test/java/org/apache/samza/system/kinesis/descriptors/TestKinesisSystemDescriptor.java new file mode 100644 index 0000000..f12dad1 --- /dev/null +++ b/samza-aws/src/test/java/org/apache/samza/system/kinesis/descriptors/TestKinesisSystemDescriptor.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.system.kinesis.descriptors; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.samza.system.kinesis.KinesisConfig; +import org.apache.samza.system.kinesis.KinesisSystemFactory; +import org.junit.Assert; +import org.junit.Test; + + +public class TestKinesisSystemDescriptor { + @Test + public void testConfigGeneration() { + String systemName = "kinesis"; + Map<String, String> kclConfig = new HashMap<>(); + kclConfig.put("key1", "value1"); + Map<String, String> awsConfig = new HashMap<>(); + awsConfig.put("key2", "value2"); + + KinesisSystemDescriptor sd = new KinesisSystemDescriptor(systemName).withRegion("London") + .withProxyHost("US") + .withProxyPort(1776) + .withAWSConfig(awsConfig) + .withKCLConfig(kclConfig); + + Map<String, String> generatedConfig = sd.toConfig(); + Assert.assertEquals(6, generatedConfig.size()); + + Assert.assertEquals(KinesisSystemFactory.class.getName(), generatedConfig.get("systems.kinesis.samza.factory")); + Assert.assertEquals("London", generatedConfig.get(String.format(KinesisConfig.CONFIG_SYSTEM_REGION, systemName))); + Assert.assertEquals("US", generatedConfig.get(String.format(KinesisConfig.CONFIG_PROXY_HOST, systemName))); + Assert.assertEquals("1776", generatedConfig.get(String.format(KinesisConfig.CONFIG_PROXY_PORT, systemName))); + Assert.assertEquals("value1", + generatedConfig.get(String.format(KinesisConfig.CONFIG_SYSTEM_KINESIS_CLIENT_LIB_CONFIG, systemName) + "key1")); + Assert.assertEquals("value2", + generatedConfig.get(String.format(KinesisConfig.CONFIG_AWS_CLIENT_CONFIG, systemName) + "key2")); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/c5348bf6/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/descriptors/HdfsInputDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/descriptors/HdfsInputDescriptor.java b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/descriptors/HdfsInputDescriptor.java new file mode 100644 index 0000000..e3e3fa4 --- /dev/null +++ b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/descriptors/HdfsInputDescriptor.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.system.hdfs.descriptors; + +import org.apache.samza.serializers.NoOpSerde; +import org.apache.samza.system.descriptors.InputDescriptor; +import org.apache.samza.system.descriptors.SystemDescriptor; + + +/** + * A {@link HdfsInputDescriptor} can be used for specifying Samza and HDFS specific properties of HDFS + * input streams. + * <p> + * Use {@link HdfsSystemDescriptor#getInputDescriptor} to obtain an instance of this descriptor. + * <p> + * Stream properties provided in configuration override corresponding properties specified using a descriptor. + * + */ +public class HdfsInputDescriptor + extends InputDescriptor<Object, HdfsInputDescriptor> { + + /** + * Constructs an {@link InputDescriptor} instance. Hdfs input has no key. Value type is determined by + * reader type (see {@link HdfsSystemDescriptor#withReaderType}). + * + * @param streamId id of the stream + * @param systemDescriptor system descriptor this stream descriptor was obtained from + */ + HdfsInputDescriptor(String streamId, SystemDescriptor systemDescriptor) { + super(streamId, new NoOpSerde(), systemDescriptor, null); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/c5348bf6/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/descriptors/HdfsOutputDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/descriptors/HdfsOutputDescriptor.java b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/descriptors/HdfsOutputDescriptor.java new file mode 100644 index 0000000..7b7e118 --- /dev/null +++ b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/descriptors/HdfsOutputDescriptor.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.system.hdfs.descriptors; + +import org.apache.samza.serializers.NoOpSerde; +import org.apache.samza.system.descriptors.OutputDescriptor; +import org.apache.samza.system.descriptors.SystemDescriptor; + +/** + * A {@link HdfsOutputDescriptor} can be used for specifying Samza and HDFS-specific properties of HDFS + * output streams. + * <p> + * Use {@link HdfsSystemDescriptor#getOutputDescriptor} to obtain an instance of this descriptor. + * <p> + * Stream properties provided in configuration override corresponding properties specified using a descriptor. + */ +public class HdfsOutputDescriptor + extends OutputDescriptor<Object, HdfsOutputDescriptor> { + + /** + * Constructs an {@link OutputDescriptor} instance. Hdfs output has no key. Value type is determined by + * writer class (see {@link HdfsSystemDescriptor#withWriterClassName}). + * + * @param streamId id of the stream + * @param systemDescriptor system descriptor this stream descriptor was obtained from + */ + HdfsOutputDescriptor(String streamId, SystemDescriptor systemDescriptor) { + super(streamId, new NoOpSerde(), systemDescriptor); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/c5348bf6/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/descriptors/HdfsSystemDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/descriptors/HdfsSystemDescriptor.java b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/descriptors/HdfsSystemDescriptor.java new file mode 100644 index 0000000..f4d8566 --- /dev/null +++ b/samza-hdfs/src/main/java/org/apache/samza/system/hdfs/descriptors/HdfsSystemDescriptor.java @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.system.hdfs.descriptors; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +import org.apache.commons.lang3.StringUtils; +import org.apache.samza.system.descriptors.SystemDescriptor; +import org.apache.samza.system.hdfs.HdfsConfig; +import org.apache.samza.system.hdfs.HdfsSystemFactory; + + +/** + * A {@link HdfsSystemDescriptor} can be used for specifying Samza and HDFS-specific properties of a HDFS + * input/output system. It can also be used for obtaining {@link HdfsInputDescriptor}s and + * {@link HdfsOutputDescriptor}s, which can be used for specifying Samza and system-specific properties of + * HDFS input/output streams. + * <p> + * System properties provided in configuration override corresponding properties specified using a descriptor. + */ +public class HdfsSystemDescriptor extends SystemDescriptor<HdfsSystemDescriptor> { + private static final String FACTORY_CLASS_NAME = HdfsSystemFactory.class.getName(); + + private Optional<String> datePathFormat = Optional.empty(); + private Optional<String> outputBaseDir = Optional.empty(); + private Optional<Long> writeBatchSizeBytes = Optional.empty(); + private Optional<Long> writeBatchSizeRecords = Optional.empty(); + private Optional<String> writeCompressionType = Optional.empty(); + private Optional<String> writerClass = Optional.empty(); + + private Optional<Long> consumerBufferCapacity = Optional.empty(); + private Optional<Long> consumerMaxRetries = Optional.empty(); + private Optional<String> consumerWhiteList = Optional.empty(); + private Optional<String> consumerBlackList = Optional.empty(); + private Optional<String> consumerGroupPattern = Optional.empty(); + private Optional<String> consumerReader = Optional.empty(); + private Optional<String> consumerStagingDirectory = Optional.empty(); + + public HdfsSystemDescriptor(String systemName) { + super(systemName, FACTORY_CLASS_NAME, null, null); + } + + /** + * Gets an {@link HdfsInputDescriptor} for the input stream of this system. + * <p> + * The message in the stream has no key and the value type is determined by reader type. + * + * @param streamId id of the input stream + * @return an {@link HdfsInputDescriptor} for the hdfs input stream + */ + public HdfsInputDescriptor getInputDescriptor(String streamId) { + return new HdfsInputDescriptor(streamId, this); + } + + /** + * Gets an {@link HdfsOutputDescriptor} for the output stream of this system. + * <p> + * The message in the stream has no key and the value type is determined by writer class. + * + * @param streamId id of the output stream + * @return an {@link HdfsOutputDescriptor} for the hdfs output stream + */ + public HdfsOutputDescriptor getOutputDescriptor(String streamId) { + return new HdfsOutputDescriptor(streamId, this); + } + + /** + * In an HdfsWriter implementation that performs time-based output bucketing, + * the user may configure a date format (suitable for inclusion in a file path) + * using <code>SimpleDateFormat</code> formatting that the Bucketer implementation will + * use to generate HDFS paths and filenames. The more granular this date format, the more + * often a bucketing HdfsWriter will begin a new date-path bucket when creating the next output file. + * @param datePathFormat date path format + * @return this system descriptor + */ + public HdfsSystemDescriptor withDatePathFormat(String datePathFormat) { + this.datePathFormat = Optional.of(StringUtils.stripToNull(datePathFormat)); + return this; + } + + /** + * The base output directory into which all HDFS output for this job will be written. + * @param outputBaseDir output base directory + * @return this system descriptor + */ + public HdfsSystemDescriptor withOutputBaseDir(String outputBaseDir) { + this.outputBaseDir = Optional.of(StringUtils.stripToNull(outputBaseDir)); + return this; + } + + /** + * Split output files from all writer tasks based on # of bytes written to optimize + * MapReduce utilization for Hadoop jobs that will process the data later. + * @param writeBatchSizeBytes write batch size in bytes. + * @return this system descriptor + */ + public HdfsSystemDescriptor withWriteBatchSizeBytes(long writeBatchSizeBytes) { + this.writeBatchSizeBytes = Optional.of(writeBatchSizeBytes); + return this; + } + + /** + * Split output files from all writer tasks based on # of bytes written to optimize + * MapReduce utilization for Hadoop jobs that will process the data later. + * @param writeBatchSizeRecords write batch size in records. + * @return this system descriptor + */ + public HdfsSystemDescriptor withWriteBatchSizeRecords(long writeBatchSizeRecords) { + this.writeBatchSizeRecords = Optional.of(writeBatchSizeRecords); + return this; + } + + /** + * Simple, human-readable label for various compression options. HdfsWriter implementations + * can choose how to handle these individually, or throw an exception. Example: "none", "gzip", ... + * @param writeCompressionType compression type for writer. + * @return this system descriptor + */ + public HdfsSystemDescriptor withWriteCompressionType(String writeCompressionType) { + this.writeCompressionType = Optional.of(StringUtils.stripToNull(writeCompressionType)); + return this; + } + + /** + * The fully-qualified class name of the HdfsWriter subclass that will write for this system. + * @param writerClassName writer class name. + * @return this system descriptor + */ + public HdfsSystemDescriptor withWriterClassName(String writerClassName) { + this.writerClass = Optional.of(StringUtils.stripToNull(writerClassName)); + return this; + } + + /** + * The capacity of the hdfs consumer buffer - the blocking queue used for storing messages. + * @param bufferCapacity the buffer capacity for HDFS consumer. + * @return this system descriptor + */ + public HdfsSystemDescriptor withConsumerBufferCapacity(long bufferCapacity) { + this.consumerBufferCapacity = Optional.of(bufferCapacity); + return this; + } + + /** + * Number of max retries for the hdfs consumer readers per partition. + * @param maxRetries number of max retires for HDFS consumer. + * @return this system descriptor + */ + public HdfsSystemDescriptor withConsumerNumMaxRetries(long maxRetries) { + this.consumerMaxRetries = Optional.of(maxRetries); + return this; + } + + /** + * White list used by directory partitioner to filter out unwanted files in a hdfs directory. + * @param whiteList white list for HDFS consumer inputs. + * @return this system descriptor + */ + public HdfsSystemDescriptor withConsumerWhiteList(String whiteList) { + this.consumerWhiteList = Optional.of(StringUtils.stripToNull(whiteList)); + return this; + } + + /** + * Black list used by directory partitioner to filter out unwanted files in a hdfs directory. + * @param blackList black list for HDFS consumer inputs. + * @return this system descriptor + */ + public HdfsSystemDescriptor withConsumerBlackList(String blackList) { + this.consumerBlackList = Optional.of(StringUtils.stripToNull(blackList)); + return this; + } + + /** + * Group pattern used by directory partitioner for advanced partitioning. + * @param groupPattern group parttern for HDFS consumer inputs. + * @return this system descriptor + */ + public HdfsSystemDescriptor withConsumerGroupPattern(String groupPattern) { + this.consumerGroupPattern = Optional.of(StringUtils.stripToNull(groupPattern)); + return this; + } + + /** + * The type of the file reader for consumer (avro, plain, etc.) + * @param readerType reader type for HDFS consumer inputs. + * @return this system descriptor + */ + public HdfsSystemDescriptor withReaderType(String readerType) { + this.consumerReader = Optional.of(StringUtils.stripToNull(readerType)); + return this; + } + + /** + * Staging directory for storing partition description. If not set, will use the staging directory set + * by yarn job. + * @param stagingDirectory staging directory for HDFS consumer inputs. + * @return this system descriptor + */ + public HdfsSystemDescriptor withStagingDirectory(String stagingDirectory) { + this.consumerStagingDirectory = Optional.of(StringUtils.stripToNull(stagingDirectory)); + return this; + } + + @Override + public Map<String, String> toConfig() { + Map<String, String> config = new HashMap<>(super.toConfig()); + String systemName = getSystemName(); + + this.datePathFormat.ifPresent( + val -> config.put(String.format(HdfsConfig.DATE_PATH_FORMAT_STRING(), systemName), val)); + this.outputBaseDir.ifPresent(val -> config.put(String.format(HdfsConfig.BASE_OUTPUT_DIR(), systemName), val)); + this.writeBatchSizeBytes.ifPresent( + val -> config.put(String.format(HdfsConfig.WRITE_BATCH_SIZE_BYTES(), systemName), String.valueOf(val))); + this.writeBatchSizeRecords.ifPresent( + val -> config.put(String.format(HdfsConfig.WRITE_BATCH_SIZE_RECORDS(), systemName), String.valueOf(val))); + this.writeCompressionType.ifPresent( + val -> config.put(String.format(HdfsConfig.COMPRESSION_TYPE(), systemName), val)); + this.writerClass.ifPresent(val -> config.put(String.format(HdfsConfig.HDFS_WRITER_CLASS_NAME(), systemName), val)); + + this.consumerBufferCapacity.ifPresent( + val -> config.put(String.format(HdfsConfig.CONSUMER_BUFFER_CAPACITY(), systemName), String.valueOf(val))); + this.consumerMaxRetries.ifPresent( + val -> config.put(String.format(HdfsConfig.CONSUMER_NUM_MAX_RETRIES(), systemName), String.valueOf(val))); + this.consumerWhiteList.ifPresent( + val -> config.put(String.format(HdfsConfig.CONSUMER_PARTITIONER_WHITELIST(), systemName), val)); + this.consumerBlackList.ifPresent( + val -> config.put(String.format(HdfsConfig.CONSUMER_PARTITIONER_BLACKLIST(), systemName), val)); + this.consumerGroupPattern.ifPresent( + val -> config.put(String.format(HdfsConfig.CONSUMER_PARTITIONER_GROUP_PATTERN(), systemName), val)); + this.consumerReader.ifPresent(val -> config.put(String.format(HdfsConfig.FILE_READER_TYPE(), systemName), val)); + this.consumerStagingDirectory.ifPresent( + val -> config.put(String.format(HdfsConfig.STAGING_DIRECTORY(), systemName), val)); + + return config; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/c5348bf6/samza-hdfs/src/test/java/org/apache/samza/system/hdfs/descriptors/TestHdfsSystemDescriptor.java ---------------------------------------------------------------------- diff --git a/samza-hdfs/src/test/java/org/apache/samza/system/hdfs/descriptors/TestHdfsSystemDescriptor.java b/samza-hdfs/src/test/java/org/apache/samza/system/hdfs/descriptors/TestHdfsSystemDescriptor.java new file mode 100644 index 0000000..78d85e9 --- /dev/null +++ b/samza-hdfs/src/test/java/org/apache/samza/system/hdfs/descriptors/TestHdfsSystemDescriptor.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.system.hdfs.descriptors; + +import java.util.Map; + +import org.apache.samza.system.hdfs.HdfsConfig; +import org.apache.samza.system.hdfs.HdfsSystemFactory; +import org.apache.samza.system.hdfs.writer.AvroDataFileHdfsWriter; +import org.junit.Assert; +import org.junit.Test; + + +public class TestHdfsSystemDescriptor { + @Test + public void testMajorConfigGeneration() { + String systemName = "hdfs"; + + HdfsSystemDescriptor sd = new HdfsSystemDescriptor(systemName).withConsumerBufferCapacity(950) + .withConsumerWhiteList(".*") + .withReaderType("avro") + .withOutputBaseDir("/home/output") + .withWriterClassName(AvroDataFileHdfsWriter.class.getName()); + sd.getInputDescriptor("input"); + + Map<String, String> generatedConfig = sd.toConfig(); + Assert.assertEquals(6, generatedConfig.size()); + System.out.println(generatedConfig); + + Assert.assertEquals(HdfsSystemFactory.class.getName(), generatedConfig.get("systems.hdfs.samza.factory")); + Assert.assertEquals("950", generatedConfig.get(String.format(HdfsConfig.CONSUMER_BUFFER_CAPACITY(), systemName))); + Assert.assertEquals(".*", + generatedConfig.get(String.format(HdfsConfig.CONSUMER_PARTITIONER_WHITELIST(), systemName))); + Assert.assertEquals("avro", generatedConfig.get(String.format(HdfsConfig.FILE_READER_TYPE(), systemName))); + Assert.assertEquals("/home/output", generatedConfig.get(String.format(HdfsConfig.BASE_OUTPUT_DIR(), systemName))); + Assert.assertEquals(AvroDataFileHdfsWriter.class.getName(), + generatedConfig.get(String.format(HdfsConfig.HDFS_WRITER_CLASS_NAME(), systemName))); + } +}
