Repository: flink Updated Branches: refs/heads/master 4d41bd8fa -> 23d1cba72
[FLINK-3923] Minor cleanups and enhancements This closes #2016 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/23d1cba7 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/23d1cba7 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/23d1cba7 Branch: refs/heads/master Commit: 23d1cba72859339bd3ee8f877b031353380c87fb Parents: fc6dc48 Author: Robert Metzger <rmetz...@apache.org> Authored: Fri May 27 14:42:39 2016 +0200 Committer: Robert Metzger <rmetz...@apache.org> Committed: Fri May 27 22:34:32 2016 +0200 ---------------------------------------------------------------------- docs/apis/streaming/connectors/kinesis.md | 10 ++ .../org/apache/flink/util/PropertiesUtil.java | 100 +++++++++++++++++++ .../connectors/kafka/FlinkKafkaConsumer08.java | 12 +-- .../kafka/internals/SimpleConsumerThread.java | 14 +-- .../connectors/kafka/util/KafkaUtils.java | 59 ----------- .../kinesis/FlinkKinesisProducer.java | 9 +- .../kinesis/util/KinesisConfigUtil.java | 49 ++++----- tools/create_release_files.sh | 4 +- 8 files changed, 152 insertions(+), 105 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/23d1cba7/docs/apis/streaming/connectors/kinesis.md ---------------------------------------------------------------------- diff --git a/docs/apis/streaming/connectors/kinesis.md b/docs/apis/streaming/connectors/kinesis.md index df6e80e..923b898 100644 --- a/docs/apis/streaming/connectors/kinesis.md +++ b/docs/apis/streaming/connectors/kinesis.md @@ -41,6 +41,16 @@ To use the connector, add the following Maven dependency to your project: **The `flink-connector-kinesis{{ site.scala_version_suffix }}` has a dependency on code licensed under the [Amazon Software License](https://aws.amazon.com/asl/) (ASL). Linking to the flink-connector-kinesis will include ASL licensed code into your application.** +The `flink-connector-kinesis{{ site.scala_version_suffix }}` artifact is not deployed to Maven central as part of +Flink releases because of the licensing issue. Therefore, you need to build the connector yourself from the source. + +Download the Flink source or check it out from the git repository. Then, use the following Maven command to build the module: +{% highlight bash %} +mvn clean install -Pinclude-kinesis -DskipTests +{% endhighlight %} + + + Note that the streaming connectors are not part of the binary distribution. See linking with them for cluster execution [here]({{site.baseurl}}/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution). http://git-wip-us.apache.org/repos/asf/flink/blob/23d1cba7/flink-core/src/main/java/org/apache/flink/util/PropertiesUtil.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/util/PropertiesUtil.java b/flink-core/src/main/java/org/apache/flink/util/PropertiesUtil.java new file mode 100644 index 0000000..3d7a7e4 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/util/PropertiesUtil.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.util; + +import org.slf4j.Logger; + +import java.util.Properties; + +/** + * Simple utilities for getting typed values from Properties. + */ +public class PropertiesUtil { + + /** + * Get integer from properties. + * This method throws an exception if the integer is not valid. + * + * @param config Properties + * @param key key in Properties + * @param defaultValue default value if value is not set + * @return default or value of key + * @throws IllegalArgumentException + */ + public static int getInt(Properties config, String key, int defaultValue) { + String val = config.getProperty(key); + if (val == null) { + return defaultValue; + } else { + try { + return Integer.parseInt(val); + } catch (NumberFormatException nfe) { + throw new IllegalArgumentException("Value for configuration key='" + key + "' is not set correctly. " + + "Entered value='" + val + "'. Default value='" + defaultValue + "'"); + } + } + } + + /** + * Get long from properties. + * This method throws an exception if the long is not valid. + * + * @param config Properties + * @param key key in Properties + * @param defaultValue default value if value is not set + * @return default or value of key + * @throws IllegalArgumentException + */ + public static long getLong(Properties config, String key, long defaultValue) { + String val = config.getProperty(key); + if (val == null) { + return defaultValue; + } else { + try { + return Long.parseLong(val); + } catch (NumberFormatException nfe) { + throw new IllegalArgumentException("Value for configuration key='" + key + "' is not set correctly. " + + "Entered value='" + val + "'. Default value='" + defaultValue + "'"); + } + } + } + + /** + * Get long from properties. + * This method only logs if the long is not valid. + * + * @param config Properties + * @param key key in Properties + * @param defaultValue default value if value is not set + * @return default or value of key + * @throws IllegalArgumentException + */ + public static long getLong(Properties config, String key, long defaultValue, Logger logger) { + try { + return getLong(config, key, defaultValue); + } catch(IllegalArgumentException iae) { + logger.warn(iae.getMessage()); + return defaultValue; + } + } + + // ------------------------------------------------------------------------ + + /** Private default constructor to prevent instantiation */ + private PropertiesUtil() {} +} http://git-wip-us.apache.org/repos/asf/flink/blob/23d1cba7/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java index 48cc461..f9bfedf 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java @@ -36,6 +36,7 @@ import org.apache.flink.streaming.util.serialization.DeserializationSchema; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper; import org.apache.flink.util.NetUtils; +import org.apache.flink.util.PropertiesUtil; import org.apache.flink.util.SerializedValue; import org.apache.kafka.clients.consumer.ConsumerConfig; @@ -48,8 +49,7 @@ import java.util.List; import java.util.Properties; import java.util.Random; -import static org.apache.flink.streaming.connectors.kafka.util.KafkaUtils.getIntFromConfig; -import static org.apache.flink.streaming.connectors.kafka.util.KafkaUtils.getLongFromConfig; +import static org.apache.flink.util.PropertiesUtil.getInt; import static org.apache.flink.util.Preconditions.checkNotNull; /** @@ -186,7 +186,7 @@ public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> { validateZooKeeperConfig(props); this.invalidOffsetBehavior = getInvalidOffsetBehavior(props); - this.autoCommitInterval = getLongFromConfig(props, "auto.commit.interval.ms", 60000); + this.autoCommitInterval = PropertiesUtil.getLong(props, "auto.commit.interval.ms", 60000); // Connect to a broker to get the partitions for all topics List<KafkaTopicPartition> partitionInfos = @@ -231,15 +231,15 @@ public class FlinkKafkaConsumer08<T> extends FlinkKafkaConsumerBase<T> { */ public static List<KafkaTopicPartitionLeader> getPartitionsForTopic(List<String> topics, Properties properties) { String seedBrokersConfString = properties.getProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG); - final int numRetries = getIntFromConfig(properties, GET_PARTITIONS_RETRIES_KEY, DEFAULT_GET_PARTITIONS_RETRIES); + final int numRetries = getInt(properties, GET_PARTITIONS_RETRIES_KEY, DEFAULT_GET_PARTITIONS_RETRIES); checkNotNull(seedBrokersConfString, "Configuration property %s not set", ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG); String[] seedBrokers = seedBrokersConfString.split(","); List<KafkaTopicPartitionLeader> partitions = new ArrayList<>(); final String clientId = "flink-kafka-consumer-partition-lookup"; - final int soTimeout = getIntFromConfig(properties, "socket.timeout.ms", 30000); - final int bufferSize = getIntFromConfig(properties, "socket.receive.buffer.bytes", 65536); + final int soTimeout = getInt(properties, "socket.timeout.ms", 30000); + final int bufferSize = getInt(properties, "socket.receive.buffer.bytes", 65536); Random rnd = new Random(); retryLoop: for (int retry = 0; retry < numRetries; retry++) { http://git-wip-us.apache.org/repos/asf/flink/blob/23d1cba7/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java index 491ffad..35e491a 100644 --- a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java +++ b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java @@ -47,7 +47,7 @@ import java.util.Map; import java.util.Properties; import static java.util.Objects.requireNonNull; -import static org.apache.flink.streaming.connectors.kafka.util.KafkaUtils.getIntFromConfig; +import static org.apache.flink.util.PropertiesUtil.getInt; /** * This class implements a thread with a connection to a single Kafka broker. The thread @@ -117,12 +117,12 @@ class SimpleConsumerThread<T> extends Thread { this.invalidOffsetBehavior = invalidOffsetBehavior; // these are the actual configuration values of Kafka + their original default values. - this.soTimeout = getIntFromConfig(config, "socket.timeout.ms", 30000); - this.minBytes = getIntFromConfig(config, "fetch.min.bytes", 1); - this.maxWait = getIntFromConfig(config, "fetch.wait.max.ms", 100); - this.fetchSize = getIntFromConfig(config, "fetch.message.max.bytes", 1048576); - this.bufferSize = getIntFromConfig(config, "socket.receive.buffer.bytes", 65536); - this.reconnectLimit = getIntFromConfig(config, "flink.simple-consumer-reconnectLimit", 3); + this.soTimeout = getInt(config, "socket.timeout.ms", 30000); + this.minBytes = getInt(config, "fetch.min.bytes", 1); + this.maxWait = getInt(config, "fetch.wait.max.ms", 100); + this.fetchSize = getInt(config, "fetch.message.max.bytes", 1048576); + this.bufferSize = getInt(config, "socket.receive.buffer.bytes", 65536); + this.reconnectLimit = getInt(config, "flink.simple-consumer-reconnectLimit", 3); } public ClosableBlockingQueue<KafkaTopicPartitionState<TopicAndPartition>> getNewPartitionsQueue() { http://git-wip-us.apache.org/repos/asf/flink/blob/23d1cba7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/util/KafkaUtils.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/util/KafkaUtils.java b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/util/KafkaUtils.java deleted file mode 100644 index fc07247..0000000 --- a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/util/KafkaUtils.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.connectors.kafka.util; - -import java.util.Properties; - -/** - * Simple utilities, used by the Flink Kafka Consumers. - */ -public class KafkaUtils { - - public static int getIntFromConfig(Properties config, String key, int defaultValue) { - String val = config.getProperty(key); - if (val == null) { - return defaultValue; - } else { - try { - return Integer.parseInt(val); - } catch (NumberFormatException nfe) { - throw new IllegalArgumentException("Value for configuration key='" + key + "' is not set correctly. " + - "Entered value='" + val + "'. Default value='" + defaultValue + "'"); - } - } - } - - public static long getLongFromConfig(Properties config, String key, long defaultValue) { - String val = config.getProperty(key); - if (val == null) { - return defaultValue; - } else { - try { - return Long.parseLong(val); - } catch (NumberFormatException nfe) { - throw new IllegalArgumentException("Value for configuration key='" + key + "' is not set correctly. " + - "Entered value='" + val + "'. Default value='" + defaultValue + "'"); - } - } - } - - // ------------------------------------------------------------------------ - - /** Private default constructor to prevent instantiation */ - private KafkaUtils() {} -} http://git-wip-us.apache.org/repos/asf/flink/blob/23d1cba7/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java index 1b4b37b..2f2b5fe 100644 --- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java +++ b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java @@ -33,6 +33,7 @@ import org.apache.flink.streaming.connectors.kinesis.serialization.KinesisSerial import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil; import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil; import org.apache.flink.streaming.util.serialization.SerializationSchema; +import org.apache.flink.util.PropertiesUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -173,12 +174,12 @@ public class FlinkKinesisProducer<OUT> extends RichSinkFunction<OUT> { producerConfig.setRegion(configProps.getProperty(KinesisConfigConstants.CONFIG_AWS_REGION)); producerConfig.setCredentialsProvider(AWSUtil.getCredentialsProvider(configProps)); if (configProps.containsKey(KinesisConfigConstants.CONFIG_PRODUCER_COLLECTION_MAX_COUNT)) { - producerConfig.setCollectionMaxCount( - Long.parseLong(configProps.getProperty(KinesisConfigConstants.CONFIG_PRODUCER_COLLECTION_MAX_COUNT))); + producerConfig.setCollectionMaxCount(PropertiesUtil.getLong(configProps, + KinesisConfigConstants.CONFIG_PRODUCER_COLLECTION_MAX_COUNT, producerConfig.getCollectionMaxCount(), LOG)); } if (configProps.containsKey(KinesisConfigConstants.CONFIG_PRODUCER_AGGREGATION_MAX_COUNT)) { - producerConfig.setAggregationMaxCount( - Long.parseLong(configProps.getProperty(KinesisConfigConstants.CONFIG_PRODUCER_AGGREGATION_MAX_COUNT))); + producerConfig.setAggregationMaxCount(PropertiesUtil.getLong(configProps, + KinesisConfigConstants.CONFIG_PRODUCER_AGGREGATION_MAX_COUNT, producerConfig.getAggregationMaxCount(), LOG)); } producer = new KinesisProducer(producerConfig); http://git-wip-us.apache.org/repos/asf/flink/blob/23d1cba7/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java index e2f2e37..042b168 100644 --- a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java +++ b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java @@ -98,43 +98,38 @@ public class KinesisConfigUtil { } } - if (config.containsKey(KinesisConfigConstants.CONFIG_STREAM_DESCRIBE_RETRIES)) { - try { - Integer.parseInt(config.getProperty(KinesisConfigConstants.CONFIG_STREAM_DESCRIBE_RETRIES)); - } catch (NumberFormatException e) { - throw new IllegalArgumentException("Invalid value given for describeStream stream operation retry count. Must be a valid integer value."); - } - } + validateOptionalIntProperty(config, KinesisConfigConstants.CONFIG_STREAM_DESCRIBE_RETRIES, + "Invalid value given for describeStream stream operation retry count. Must be a valid integer value."); - if (config.containsKey(KinesisConfigConstants.CONFIG_STREAM_DESCRIBE_BACKOFF)) { - try { - Long.parseLong(config.getProperty(KinesisConfigConstants.CONFIG_STREAM_DESCRIBE_BACKOFF)); - } catch (NumberFormatException e) { - throw new IllegalArgumentException("Invalid value given for describeStream stream operation backoff milliseconds. Must be a valid long value."); - } - } + validateOptionalIntProperty(config, KinesisConfigConstants.CONFIG_SHARD_RECORDS_PER_GET, + "Invalid value given for maximum records per getRecords shard operation. Must be a valid integer value."); - if (config.containsKey(KinesisConfigConstants.CONFIG_SHARD_RECORDS_PER_GET)) { - try { - Integer.parseInt(config.getProperty(KinesisConfigConstants.CONFIG_SHARD_RECORDS_PER_GET)); - } catch (NumberFormatException e) { - throw new IllegalArgumentException("Invalid value given for maximum records per getRecords shard operation. Must be a valid integer value."); - } - } + validateOptionalLongProperty(config, KinesisConfigConstants.CONFIG_STREAM_DESCRIBE_BACKOFF, + "Invalid value given for describeStream stream operation backoff milliseconds. Must be a valid long value."); - if (config.containsKey(KinesisConfigConstants.CONFIG_PRODUCER_COLLECTION_MAX_COUNT)) { + validateOptionalLongProperty(config, KinesisConfigConstants.CONFIG_PRODUCER_COLLECTION_MAX_COUNT, + "Invalid value given for maximum number of items to pack into a PutRecords request. Must be a valid long value."); + + validateOptionalLongProperty(config, KinesisConfigConstants.CONFIG_PRODUCER_AGGREGATION_MAX_COUNT, + "Invalid value given for maximum number of items to pack into an aggregated record. Must be a valid long value."); + } + + private static void validateOptionalLongProperty(Properties config, String key, String message) { + if (config.containsKey(key)) { try { - Long.parseLong(config.getProperty(KinesisConfigConstants.CONFIG_PRODUCER_COLLECTION_MAX_COUNT)); + Long.parseLong(config.getProperty(key)); } catch (NumberFormatException e) { - throw new IllegalArgumentException("Invalid value given for maximum number of items to pack into a PutRecords request. Must be a valid long value."); + throw new IllegalArgumentException(message); } } + } - if (config.containsKey(KinesisConfigConstants.CONFIG_PRODUCER_AGGREGATION_MAX_COUNT)) { + private static void validateOptionalIntProperty(Properties config, String key, String message) { + if (config.containsKey(key)) { try { - Long.parseLong(config.getProperty(KinesisConfigConstants.CONFIG_PRODUCER_AGGREGATION_MAX_COUNT)); + Integer.parseInt(config.getProperty(key)); } catch (NumberFormatException e) { - throw new IllegalArgumentException("Invalid value given for maximum number of items to pack into an aggregated record. Must be a valid long value."); + throw new IllegalArgumentException(message); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/23d1cba7/tools/create_release_files.sh ---------------------------------------------------------------------- diff --git a/tools/create_release_files.sh b/tools/create_release_files.sh index f529add..91fce78 100755 --- a/tools/create_release_files.sh +++ b/tools/create_release_files.sh @@ -182,7 +182,7 @@ deploy_to_maven() { # are depending on scala 2.10. echo "Deploying Scala 2.10 version" cd tools && ./change-scala-version.sh 2.10 && cd .. - $MVN clean deploy -Dgpg.executable=$GPG -Prelease,docs-and-source,include-kinesis --settings deploysettings.xml -DskipTests -Dgpg.keyname=$GPG_KEY -Dgpg.passphrase=$GPG_PASSPHRASE -DretryFailedDeploymentCount=10 + $MVN clean deploy -Dgpg.executable=$GPG -Prelease,docs-and-source --settings deploysettings.xml -DskipTests -Dgpg.keyname=$GPG_KEY -Dgpg.passphrase=$GPG_PASSPHRASE -DretryFailedDeploymentCount=10 echo "Deploying Scala 2.10 / hadoop 1 version" @@ -190,7 +190,7 @@ deploy_to_maven() { sleep 4 - $MVN clean deploy -Dgpg.executable=$GPG -Prelease,docs-and-source,include-kinesis --settings deploysettings.xml -DskipTests -Dgpg.keyname=$GPG_KEY -Dgpg.passphrase=$GPG_PASSPHRASE -DretryFailedDeploymentCount=10 + $MVN clean deploy -Dgpg.executable=$GPG -Prelease,docs-and-source --settings deploysettings.xml -DskipTests -Dgpg.keyname=$GPG_KEY -Dgpg.passphrase=$GPG_PASSPHRASE -DretryFailedDeploymentCount=10 } copy_data() {