Repository: flink
Updated Branches:
  refs/heads/master 4d41bd8fa -> 23d1cba72


[FLINK-3923] Minor cleanups and enhancements

This closes #2016


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/23d1cba7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/23d1cba7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/23d1cba7

Branch: refs/heads/master
Commit: 23d1cba72859339bd3ee8f877b031353380c87fb
Parents: fc6dc48
Author: Robert Metzger <rmetz...@apache.org>
Authored: Fri May 27 14:42:39 2016 +0200
Committer: Robert Metzger <rmetz...@apache.org>
Committed: Fri May 27 22:34:32 2016 +0200

----------------------------------------------------------------------
 docs/apis/streaming/connectors/kinesis.md       |  10 ++
 .../org/apache/flink/util/PropertiesUtil.java   | 100 +++++++++++++++++++
 .../connectors/kafka/FlinkKafkaConsumer08.java  |  12 +--
 .../kafka/internals/SimpleConsumerThread.java   |  14 +--
 .../connectors/kafka/util/KafkaUtils.java       |  59 -----------
 .../kinesis/FlinkKinesisProducer.java           |   9 +-
 .../kinesis/util/KinesisConfigUtil.java         |  49 ++++-----
 tools/create_release_files.sh                   |   4 +-
 8 files changed, 152 insertions(+), 105 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/23d1cba7/docs/apis/streaming/connectors/kinesis.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming/connectors/kinesis.md 
b/docs/apis/streaming/connectors/kinesis.md
index df6e80e..923b898 100644
--- a/docs/apis/streaming/connectors/kinesis.md
+++ b/docs/apis/streaming/connectors/kinesis.md
@@ -41,6 +41,16 @@ To use the connector, add the following Maven dependency to 
your project:
 **The `flink-connector-kinesis{{ site.scala_version_suffix }}` has a 
dependency on code licensed under the [Amazon Software 
License](https://aws.amazon.com/asl/) (ASL).
 Linking to the flink-connector-kinesis will include ASL licensed code into 
your application.**
 
+The `flink-connector-kinesis{{ site.scala_version_suffix }}` artifact is not 
deployed to Maven central as part of
+Flink releases because of the licensing issue. Therefore, you need to build 
the connector yourself from the source.
+
+Download the Flink source or check it out from the git repository. Then, use 
the following Maven command to build the module:
+{% highlight bash %}
+mvn clean install -Pinclude-kinesis -DskipTests
+{% endhighlight %}
+
+
+
 Note that the streaming connectors are not part of the binary distribution. 
 See linking with them for cluster execution 
[here]({{site.baseurl}}/apis/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution).
 

http://git-wip-us.apache.org/repos/asf/flink/blob/23d1cba7/flink-core/src/main/java/org/apache/flink/util/PropertiesUtil.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/PropertiesUtil.java 
b/flink-core/src/main/java/org/apache/flink/util/PropertiesUtil.java
new file mode 100644
index 0000000..3d7a7e4
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/PropertiesUtil.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.slf4j.Logger;
+
+import java.util.Properties;
+
+/**
+ * Simple utilities for getting typed values from Properties.
+ */
+public class PropertiesUtil {
+
+       /**
+        * Get integer from properties.
+        * This method throws an exception if the integer is not valid.
+        *
+        * @param config Properties
+        * @param key key in Properties
+        * @param defaultValue default value if value is not set
+        * @return default or value of key
+        * @throws IllegalArgumentException
+        */
+       public static int getInt(Properties config, String key, int 
defaultValue) {
+               String val = config.getProperty(key);
+               if (val == null) {
+                       return defaultValue;
+               } else {
+                       try {
+                               return Integer.parseInt(val);
+                       } catch (NumberFormatException nfe) {
+                               throw new IllegalArgumentException("Value for 
configuration key='" + key + "' is not set correctly. " +
+                                               "Entered value='" + val + "'. 
Default value='" + defaultValue + "'");
+                       }
+               }
+       }
+
+       /**
+        * Get long from properties.
+        * This method throws an exception if the long is not valid.
+        *
+        * @param config Properties
+        * @param key key in Properties
+        * @param defaultValue default value if value is not set
+        * @return default or value of key
+        * @throws IllegalArgumentException
+        */
+       public static long getLong(Properties config, String key, long 
defaultValue) {
+               String val = config.getProperty(key);
+               if (val == null) {
+                       return defaultValue;
+               } else {
+                       try {
+                               return Long.parseLong(val);
+                       } catch (NumberFormatException nfe) {
+                               throw new IllegalArgumentException("Value for 
configuration key='" + key + "' is not set correctly. " +
+                                               "Entered value='" + val + "'. 
Default value='" + defaultValue + "'");
+                       }
+               }
+       }
+
+       /**
+        * Get long from properties.
+        * This method only logs if the long is not valid.
+        *
+        * @param config Properties
+        * @param key key in Properties
+        * @param defaultValue default value if value is not set
+        * @return default or value of key
+        * @throws IllegalArgumentException
+        */
+       public static long getLong(Properties config, String key, long 
defaultValue, Logger logger) {
+               try {
+                       return getLong(config, key, defaultValue);
+               } catch(IllegalArgumentException iae) {
+                       logger.warn(iae.getMessage());
+                       return defaultValue;
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       
+       /** Private default constructor to prevent instantiation */
+       private PropertiesUtil() {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/23d1cba7/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
index 48cc461..f9bfedf 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
@@ -36,6 +36,7 @@ import 
org.apache.flink.streaming.util.serialization.DeserializationSchema;
 import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
 import 
org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper;
 import org.apache.flink.util.NetUtils;
+import org.apache.flink.util.PropertiesUtil;
 import org.apache.flink.util.SerializedValue;
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -48,8 +49,7 @@ import java.util.List;
 import java.util.Properties;
 import java.util.Random;
 
-import static 
org.apache.flink.streaming.connectors.kafka.util.KafkaUtils.getIntFromConfig;
-import static 
org.apache.flink.streaming.connectors.kafka.util.KafkaUtils.getLongFromConfig;
+import static org.apache.flink.util.PropertiesUtil.getInt;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -186,7 +186,7 @@ public class FlinkKafkaConsumer08<T> extends 
FlinkKafkaConsumerBase<T> {
                validateZooKeeperConfig(props);
 
                this.invalidOffsetBehavior = getInvalidOffsetBehavior(props);
-               this.autoCommitInterval = getLongFromConfig(props, 
"auto.commit.interval.ms", 60000);
+               this.autoCommitInterval = PropertiesUtil.getLong(props, 
"auto.commit.interval.ms", 60000);
 
                // Connect to a broker to get the partitions for all topics
                List<KafkaTopicPartition> partitionInfos = 
@@ -231,15 +231,15 @@ public class FlinkKafkaConsumer08<T> extends 
FlinkKafkaConsumerBase<T> {
         */
        public static List<KafkaTopicPartitionLeader> 
getPartitionsForTopic(List<String> topics, Properties properties) {
                String seedBrokersConfString = 
properties.getProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG);
-               final int numRetries = getIntFromConfig(properties, 
GET_PARTITIONS_RETRIES_KEY, DEFAULT_GET_PARTITIONS_RETRIES);
+               final int numRetries = getInt(properties, 
GET_PARTITIONS_RETRIES_KEY, DEFAULT_GET_PARTITIONS_RETRIES);
 
                checkNotNull(seedBrokersConfString, "Configuration property %s 
not set", ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG);
                String[] seedBrokers = seedBrokersConfString.split(",");
                List<KafkaTopicPartitionLeader> partitions = new ArrayList<>();
 
                final String clientId = "flink-kafka-consumer-partition-lookup";
-               final int soTimeout = getIntFromConfig(properties, 
"socket.timeout.ms", 30000);
-               final int bufferSize = getIntFromConfig(properties, 
"socket.receive.buffer.bytes", 65536);
+               final int soTimeout = getInt(properties, "socket.timeout.ms", 
30000);
+               final int bufferSize = getInt(properties, 
"socket.receive.buffer.bytes", 65536);
 
                Random rnd = new Random();
                retryLoop: for (int retry = 0; retry < numRetries; retry++) {

http://git-wip-us.apache.org/repos/asf/flink/blob/23d1cba7/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
index 491ffad..35e491a 100644
--- 
a/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
+++ 
b/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/internals/SimpleConsumerThread.java
@@ -47,7 +47,7 @@ import java.util.Map;
 import java.util.Properties;
 
 import static java.util.Objects.requireNonNull;
-import static 
org.apache.flink.streaming.connectors.kafka.util.KafkaUtils.getIntFromConfig;
+import static org.apache.flink.util.PropertiesUtil.getInt;
 
 /**
  * This class implements a thread with a connection to a single Kafka broker. 
The thread
@@ -117,12 +117,12 @@ class SimpleConsumerThread<T> extends Thread {
                this.invalidOffsetBehavior = invalidOffsetBehavior;
                
                // these are the actual configuration values of Kafka + their 
original default values.
-               this.soTimeout = getIntFromConfig(config, "socket.timeout.ms", 
30000);
-               this.minBytes = getIntFromConfig(config, "fetch.min.bytes", 1);
-               this.maxWait = getIntFromConfig(config, "fetch.wait.max.ms", 
100);
-               this.fetchSize = getIntFromConfig(config, 
"fetch.message.max.bytes", 1048576);
-               this.bufferSize = getIntFromConfig(config, 
"socket.receive.buffer.bytes", 65536);
-               this.reconnectLimit = getIntFromConfig(config, 
"flink.simple-consumer-reconnectLimit", 3);
+               this.soTimeout = getInt(config, "socket.timeout.ms", 30000);
+               this.minBytes = getInt(config, "fetch.min.bytes", 1);
+               this.maxWait = getInt(config, "fetch.wait.max.ms", 100);
+               this.fetchSize = getInt(config, "fetch.message.max.bytes", 
1048576);
+               this.bufferSize = getInt(config, "socket.receive.buffer.bytes", 
65536);
+               this.reconnectLimit = getInt(config, 
"flink.simple-consumer-reconnectLimit", 3);
        }
 
        public 
ClosableBlockingQueue<KafkaTopicPartitionState<TopicAndPartition>> 
getNewPartitionsQueue() {

http://git-wip-us.apache.org/repos/asf/flink/blob/23d1cba7/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/util/KafkaUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/util/KafkaUtils.java
 
b/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/util/KafkaUtils.java
deleted file mode 100644
index fc07247..0000000
--- 
a/flink-streaming-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/util/KafkaUtils.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.connectors.kafka.util;
-
-import java.util.Properties;
-
-/**
- * Simple utilities, used by the Flink Kafka Consumers.
- */
-public class KafkaUtils {
-
-       public static int getIntFromConfig(Properties config, String key, int 
defaultValue) {
-               String val = config.getProperty(key);
-               if (val == null) {
-                       return defaultValue;
-               } else {
-                       try {
-                               return Integer.parseInt(val);
-                       } catch (NumberFormatException nfe) {
-                               throw new IllegalArgumentException("Value for 
configuration key='" + key + "' is not set correctly. " +
-                                               "Entered value='" + val + "'. 
Default value='" + defaultValue + "'");
-                       }
-               }
-       }
-
-       public static long getLongFromConfig(Properties config, String key, 
long defaultValue) {
-               String val = config.getProperty(key);
-               if (val == null) {
-                       return defaultValue;
-               } else {
-                       try {
-                               return Long.parseLong(val);
-                       } catch (NumberFormatException nfe) {
-                               throw new IllegalArgumentException("Value for 
configuration key='" + key + "' is not set correctly. " +
-                                               "Entered value='" + val + "'. 
Default value='" + defaultValue + "'");
-                       }
-               }
-       }
-       
-       // 
------------------------------------------------------------------------
-       
-       /** Private default constructor to prevent instantiation */
-       private KafkaUtils() {}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/23d1cba7/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
 
b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
index 1b4b37b..2f2b5fe 100644
--- 
a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
+++ 
b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducer.java
@@ -33,6 +33,7 @@ import 
org.apache.flink.streaming.connectors.kinesis.serialization.KinesisSerial
 import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
 import org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil;
 import org.apache.flink.streaming.util.serialization.SerializationSchema;
+import org.apache.flink.util.PropertiesUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -173,12 +174,12 @@ public class FlinkKinesisProducer<OUT> extends 
RichSinkFunction<OUT> {
                
producerConfig.setRegion(configProps.getProperty(KinesisConfigConstants.CONFIG_AWS_REGION));
                
producerConfig.setCredentialsProvider(AWSUtil.getCredentialsProvider(configProps));
                if 
(configProps.containsKey(KinesisConfigConstants.CONFIG_PRODUCER_COLLECTION_MAX_COUNT))
 {
-                       producerConfig.setCollectionMaxCount(
-                                       
Long.parseLong(configProps.getProperty(KinesisConfigConstants.CONFIG_PRODUCER_COLLECTION_MAX_COUNT)));
+                       
producerConfig.setCollectionMaxCount(PropertiesUtil.getLong(configProps,
+                                       
KinesisConfigConstants.CONFIG_PRODUCER_COLLECTION_MAX_COUNT, 
producerConfig.getCollectionMaxCount(), LOG));
                }
                if 
(configProps.containsKey(KinesisConfigConstants.CONFIG_PRODUCER_AGGREGATION_MAX_COUNT))
 {
-                       producerConfig.setAggregationMaxCount(
-                                       
Long.parseLong(configProps.getProperty(KinesisConfigConstants.CONFIG_PRODUCER_AGGREGATION_MAX_COUNT)));
+                       
producerConfig.setAggregationMaxCount(PropertiesUtil.getLong(configProps,
+                                       
KinesisConfigConstants.CONFIG_PRODUCER_AGGREGATION_MAX_COUNT, 
producerConfig.getAggregationMaxCount(), LOG));
                }
 
                producer = new KinesisProducer(producerConfig);

http://git-wip-us.apache.org/repos/asf/flink/blob/23d1cba7/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
 
b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
index e2f2e37..042b168 100644
--- 
a/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
+++ 
b/flink-streaming-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/KinesisConfigUtil.java
@@ -98,43 +98,38 @@ public class KinesisConfigUtil {
                        }
                }
 
-               if 
(config.containsKey(KinesisConfigConstants.CONFIG_STREAM_DESCRIBE_RETRIES)) {
-                       try {
-                               
Integer.parseInt(config.getProperty(KinesisConfigConstants.CONFIG_STREAM_DESCRIBE_RETRIES));
-                       } catch (NumberFormatException e) {
-                               throw new IllegalArgumentException("Invalid 
value given for describeStream stream operation retry count. Must be a valid 
integer value.");
-                       }
-               }
+               validateOptionalIntProperty(config, 
KinesisConfigConstants.CONFIG_STREAM_DESCRIBE_RETRIES,
+                               "Invalid value given for describeStream stream 
operation retry count. Must be a valid integer value.");
 
-               if 
(config.containsKey(KinesisConfigConstants.CONFIG_STREAM_DESCRIBE_BACKOFF)) {
-                       try {
-                               
Long.parseLong(config.getProperty(KinesisConfigConstants.CONFIG_STREAM_DESCRIBE_BACKOFF));
-                       } catch (NumberFormatException e) {
-                               throw new IllegalArgumentException("Invalid 
value given for describeStream stream operation backoff milliseconds. Must be a 
valid long value.");
-                       }
-               }
+               validateOptionalIntProperty(config, 
KinesisConfigConstants.CONFIG_SHARD_RECORDS_PER_GET,
+                               "Invalid value given for maximum records per 
getRecords shard operation. Must be a valid integer value.");
 
-               if 
(config.containsKey(KinesisConfigConstants.CONFIG_SHARD_RECORDS_PER_GET)) {
-                       try {
-                               
Integer.parseInt(config.getProperty(KinesisConfigConstants.CONFIG_SHARD_RECORDS_PER_GET));
-                       } catch (NumberFormatException e) {
-                               throw new IllegalArgumentException("Invalid 
value given for maximum records per getRecords shard operation. Must be a valid 
integer value.");
-                       }
-               }
+               validateOptionalLongProperty(config, 
KinesisConfigConstants.CONFIG_STREAM_DESCRIBE_BACKOFF,
+                               "Invalid value given for describeStream stream 
operation backoff milliseconds. Must be a valid long value.");
 
-               if 
(config.containsKey(KinesisConfigConstants.CONFIG_PRODUCER_COLLECTION_MAX_COUNT))
 {
+               validateOptionalLongProperty(config, 
KinesisConfigConstants.CONFIG_PRODUCER_COLLECTION_MAX_COUNT,
+                               "Invalid value given for maximum number of 
items to pack into a PutRecords request. Must be a valid long value.");
+
+               validateOptionalLongProperty(config, 
KinesisConfigConstants.CONFIG_PRODUCER_AGGREGATION_MAX_COUNT,
+                               "Invalid value given for maximum number of 
items to pack into an aggregated record. Must be a valid long value.");
+       }
+
+       private static void validateOptionalLongProperty(Properties config, 
String key, String message) {
+               if (config.containsKey(key)) {
                        try {
-                               
Long.parseLong(config.getProperty(KinesisConfigConstants.CONFIG_PRODUCER_COLLECTION_MAX_COUNT));
+                               Long.parseLong(config.getProperty(key));
                        } catch (NumberFormatException e) {
-                               throw new IllegalArgumentException("Invalid 
value given for maximum number of items to pack into a PutRecords request. Must 
be a valid long value.");
+                               throw new IllegalArgumentException(message);
                        }
                }
+       }
 
-               if 
(config.containsKey(KinesisConfigConstants.CONFIG_PRODUCER_AGGREGATION_MAX_COUNT))
 {
+       private static void validateOptionalIntProperty(Properties config, 
String key, String message) {
+               if (config.containsKey(key)) {
                        try {
-                               
Long.parseLong(config.getProperty(KinesisConfigConstants.CONFIG_PRODUCER_AGGREGATION_MAX_COUNT));
+                               Integer.parseInt(config.getProperty(key));
                        } catch (NumberFormatException e) {
-                               throw new IllegalArgumentException("Invalid 
value given for maximum number of items to pack into an aggregated record. Must 
be a valid long value.");
+                               throw new IllegalArgumentException(message);
                        }
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/23d1cba7/tools/create_release_files.sh
----------------------------------------------------------------------
diff --git a/tools/create_release_files.sh b/tools/create_release_files.sh
index f529add..91fce78 100755
--- a/tools/create_release_files.sh
+++ b/tools/create_release_files.sh
@@ -182,7 +182,7 @@ deploy_to_maven() {
   # are depending on scala 2.10.
   echo "Deploying Scala 2.10 version"
   cd tools && ./change-scala-version.sh 2.10 && cd ..
-  $MVN clean deploy -Dgpg.executable=$GPG 
-Prelease,docs-and-source,include-kinesis --settings deploysettings.xml 
-DskipTests -Dgpg.keyname=$GPG_KEY -Dgpg.passphrase=$GPG_PASSPHRASE 
-DretryFailedDeploymentCount=10
+  $MVN clean deploy -Dgpg.executable=$GPG -Prelease,docs-and-source --settings 
deploysettings.xml -DskipTests -Dgpg.keyname=$GPG_KEY 
-Dgpg.passphrase=$GPG_PASSPHRASE -DretryFailedDeploymentCount=10
 
 
   echo "Deploying Scala 2.10 / hadoop 1 version"
@@ -190,7 +190,7 @@ deploy_to_maven() {
 
 
   sleep 4
-  $MVN clean deploy -Dgpg.executable=$GPG 
-Prelease,docs-and-source,include-kinesis --settings deploysettings.xml 
-DskipTests -Dgpg.keyname=$GPG_KEY -Dgpg.passphrase=$GPG_PASSPHRASE 
-DretryFailedDeploymentCount=10
+  $MVN clean deploy -Dgpg.executable=$GPG -Prelease,docs-and-source --settings 
deploysettings.xml -DskipTests -Dgpg.keyname=$GPG_KEY 
-Dgpg.passphrase=$GPG_PASSPHRASE -DretryFailedDeploymentCount=10
 }
 
 copy_data() {

Reply via email to