This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 5438659 [Pulsar-Flink] Extends Validations (#3063)
5438659 is described below
commit 5438659ae6b8b911ed1c85b0645c92229a85b472
Author: Eren Avsarogullari <[email protected]>
AuthorDate: Tue Nov 27 02:30:30 2018 +0000
[Pulsar-Flink] Extends Validations (#3063)
### Motivation
This PR aims to extend validations on Flink Connector.
### Modifications
1- `FlinkPulsarProducer` constructor needs to be robust for **blank**
values.
2- `PulsarSourceBuilder` needs to be robust for **blank** values.
3- `totalMessageCount` variable looks redundant so can be removed on
`PulsarConsumerSource`
4- `PulsarSourceBuilder` UT coverage is added.
### Test Coverage
New UT coverage is added.
---
.../connectors/pulsar/FlinkPulsarProducer.java | 8 +-
.../connectors/pulsar/PulsarConsumerSource.java | 3 -
.../connectors/pulsar/PulsarSourceBuilder.java | 16 ++--
.../connectors/pulsar/PulsarSourceBuilderTest.java | 104 +++++++++++++++++++++
4 files changed, 119 insertions(+), 12 deletions(-)
diff --git
a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
index 48bc0f1..bced297 100644
---
a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
+++
b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/FlinkPulsarProducer.java
@@ -18,6 +18,7 @@
*/
package org.apache.flink.streaming.connectors.pulsar;
+import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.java.ClosureCleaner;
@@ -40,6 +41,7 @@ import org.slf4j.LoggerFactory;
import java.util.function.Function;
+import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
@@ -121,8 +123,10 @@ public class FlinkPulsarProducer<IN>
SerializationSchema<IN> serializationSchema,
ProducerConfiguration producerConfig,
PulsarKeyExtractor<IN> keyExtractor) {
- this.serviceUrl = checkNotNull(serviceUrl, "Service url not set");
- this.defaultTopicName = checkNotNull(defaultTopicName, "TopicName not
set");
+ checkArgument(StringUtils.isNotBlank(serviceUrl), "Service url cannot
be blank");
+ checkArgument(StringUtils.isNotBlank(defaultTopicName), "TopicName
cannot be blank");
+ this.serviceUrl = serviceUrl;
+ this.defaultTopicName = defaultTopicName;
this.schema = checkNotNull(serializationSchema, "Serialization Schema
not set");
this.producerConfig = checkNotNull(producerConfig, "Producer Config is
not set");
this.flinkPulsarKeyExtractor = getOrNullKeyExtractor(keyExtractor);
diff --git
a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java
b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java
index 0d01def..84e0e50 100644
---
a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java
+++
b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSource.java
@@ -66,7 +66,6 @@ class PulsarConsumerSource<T> extends
MessageAcknowledgingSourceBase<T, MessageI
private final long acknowledgementBatchSize;
private long batchCount;
- private long totalMessageCount;
private transient volatile boolean isRunning;
@@ -147,14 +146,12 @@ class PulsarConsumerSource<T> extends
MessageAcknowledgingSourceBase<T, MessageI
return;
}
context.collect(deserialize(message));
- totalMessageCount++;
}
}
private void emitAutoAcking(SourceContext<T> context, Message message)
throws IOException {
context.collect(deserialize(message));
batchCount++;
- totalMessageCount++;
if (batchCount >= acknowledgementBatchSize) {
LOG.info("processed {} messages acknowledging messageId {}",
batchCount, message.getMessageId());
consumer.acknowledgeCumulative(message.getMessageId());
diff --git
a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java
b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java
index 3396983..9605f07 100644
---
a/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java
+++
b/pulsar-flink/src/main/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilder.java
@@ -18,6 +18,7 @@
*/
package org.apache.flink.streaming.connectors.pulsar;
+import org.apache.commons.lang3.StringUtils;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
@@ -29,9 +30,9 @@ import org.apache.flink.util.Preconditions;
@PublicEvolving
public class PulsarSourceBuilder<T> {
- static final String SERVICE_URL = "pulsar://localhost:6650";
- static final long ACKNOWLEDGEMENT_BATCH_SIZE = 100;
- static final long MAX_ACKNOWLEDGEMENT_BATCH_SIZE = 1000;
+ private static final String SERVICE_URL = "pulsar://localhost:6650";
+ private static final long ACKNOWLEDGEMENT_BATCH_SIZE = 100;
+ private static final long MAX_ACKNOWLEDGEMENT_BATCH_SIZE = 1000;
final DeserializationSchema<T> deserializationSchema;
String serviceUrl = SERVICE_URL;
@@ -50,7 +51,7 @@ public class PulsarSourceBuilder<T> {
* @return this builder
*/
public PulsarSourceBuilder<T> serviceUrl(String serviceUrl) {
- Preconditions.checkNotNull(serviceUrl);
+ Preconditions.checkArgument(StringUtils.isNotBlank(serviceUrl),
"serviceUrl cannot be blank");
this.serviceUrl = serviceUrl;
return this;
}
@@ -66,7 +67,7 @@ public class PulsarSourceBuilder<T> {
* @return this builder
*/
public PulsarSourceBuilder<T> topic(String topic) {
- Preconditions.checkNotNull(topic);
+ Preconditions.checkArgument(StringUtils.isNotBlank(topic), "topic
cannot be blank");
this.topic = topic;
return this;
}
@@ -78,7 +79,8 @@ public class PulsarSourceBuilder<T> {
* @return this builder
*/
public PulsarSourceBuilder<T> subscriptionName(String subscriptionName) {
- Preconditions.checkNotNull(subscriptionName);
+ Preconditions.checkArgument(StringUtils.isNotBlank(subscriptionName),
+ "subscriptionName cannot be blank");
this.subscriptionName = subscriptionName;
return this;
}
@@ -112,7 +114,7 @@ public class PulsarSourceBuilder<T> {
* @return a builder
*/
public static <T> PulsarSourceBuilder<T> builder(DeserializationSchema<T>
deserializationSchema) {
- Preconditions.checkNotNull(deserializationSchema);
+ Preconditions.checkNotNull(deserializationSchema,
"deserializationSchema cannot be null");
return new PulsarSourceBuilder<>(deserializationSchema);
}
}
diff --git
a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilderTest.java
b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilderTest.java
new file mode 100644
index 0000000..5a916e8
--- /dev/null
+++
b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarSourceBuilderTest.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flink.streaming.connectors.pulsar;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+/**
+ * Tests for PulsarSourceBuilder
+ */
+public class PulsarSourceBuilderTest {
+
+ private PulsarSourceBuilder pulsarSourceBuilder;
+
+ @Before
+ public void before() {
+ pulsarSourceBuilder = PulsarSourceBuilder.builder(new
TestDeserializationSchema());
+ }
+
+ @Test
+ public void testBuild() {
+ SourceFunction sourceFunction = pulsarSourceBuilder
+ .serviceUrl("testServiceUrl")
+ .topic("testTopic")
+ .subscriptionName("testSubscriptionName")
+ .build();
+ Assert.assertNotNull(sourceFunction);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testBuildWithoutSettingRequiredProperties() {
+ pulsarSourceBuilder.build();
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testServiceUrlWithNull() {
+ pulsarSourceBuilder.serviceUrl(null);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testServiceUrlWithBlank() {
+ pulsarSourceBuilder.serviceUrl(" ");
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testTopicWithNull() {
+ pulsarSourceBuilder.topic(null);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testTopicWithBlank() {
+ pulsarSourceBuilder.topic(" ");
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testSubscriptionNameWithNull() {
+ pulsarSourceBuilder.subscriptionName(null);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testSubscriptionNameWithBlank() {
+ pulsarSourceBuilder.subscriptionName(" ");
+ }
+
+ private class TestDeserializationSchema<T> implements
DeserializationSchema<T> {
+
+ @Override
+ public T deserialize(byte[] bytes) throws IOException {
+ return null;
+ }
+
+ @Override
+ public boolean isEndOfStream(T t) {
+ return false;
+ }
+
+ @Override
+ public TypeInformation<T> getProducedType() {
+ return null;
+ }
+ }
+}