This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 4634311e751 [improve][io] Improve kinesis connector config. (#21004)
4634311e751 is described below

commit 4634311e75176f9acec16e37e2900945a8e2040e
Author: Baodi Shi <[email protected]>
AuthorDate: Tue Aug 22 15:31:46 2023 +0800

    [improve][io] Improve kinesis connector config. (#21004)
---
 .../io/kinesis/AwsCredentialProviderPlugin.java    | 29 -----------
 .../io/kinesis/AwsDefaultProviderChainPlugin.java  | 30 ------------
 .../pulsar/io/kinesis/BaseKinesisConfig.java       |  7 +++
 .../org/apache/pulsar/io/kinesis/KinesisSink.java  | 17 +++----
 .../pulsar/io/kinesis/KinesisSinkConfig.java       | 18 ++++---
 .../apache/pulsar/io/kinesis/KinesisSource.java    | 17 +------
 .../pulsar/io/kinesis/KinesisSourceConfig.java     | 36 +++++++-------
 .../io/kinesis/STSAssumeRoleProviderPlugin.java    | 31 ------------
 .../pulsar/io/kinesis/KinesisSinkConfigTests.java  | 33 ++++---------
 .../io/kinesis/KinesisSourceConfigTests.java       | 56 ++++++----------------
 .../kinesis/src/test/resources/sinkConfig.yaml     | 27 -----------
 .../kinesis/src/test/resources/sourceConfig.yaml   | 32 -------------
 12 files changed, 71 insertions(+), 262 deletions(-)

diff --git 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/AwsCredentialProviderPlugin.java
 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/AwsCredentialProviderPlugin.java
deleted file mode 100644
index e88a952293b..00000000000
--- 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/AwsCredentialProviderPlugin.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.kinesis;
-
-/**
- * This is a stub class for backwards compatibility.  In new code and 
configurations, please use the plugins
- * from org.apache.pulsar.io.aws
- *
- * @see org.apache.pulsar.io.aws.AwsCredentialProviderPlugin
- */
-@Deprecated
-public interface AwsCredentialProviderPlugin extends 
org.apache.pulsar.io.aws.AwsCredentialProviderPlugin {
-}
diff --git 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/AwsDefaultProviderChainPlugin.java
 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/AwsDefaultProviderChainPlugin.java
deleted file mode 100644
index 75952a71a29..00000000000
--- 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/AwsDefaultProviderChainPlugin.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.kinesis;
-
-/**
- * This is a stub class for backwards compatibility.  In new code and 
configurations, please use the plugins
- * from org.apache.pulsar.io.aws
- *
- * @see org.apache.pulsar.io.aws.AwsDefaultProviderChainPlugin
- */
-@Deprecated
-public class AwsDefaultProviderChainPlugin extends 
org.apache.pulsar.io.aws.AwsDefaultProviderChainPlugin
-        implements AwsCredentialProviderPlugin {
-}
diff --git 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/BaseKinesisConfig.java
 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/BaseKinesisConfig.java
index c9c951ae2b7..7bd95b0d6e3 100644
--- 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/BaseKinesisConfig.java
+++ 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/BaseKinesisConfig.java
@@ -35,6 +35,13 @@ public abstract class BaseKinesisConfig implements 
Serializable {
     )
     private String awsEndpoint = "";
 
+    @FieldDoc(
+            required = false,
+            defaultValue = "",
+            help = "Cloudwatch end-point url. It can be found at "
+                    + 
"https://docs.aws.amazon.com/general/latest/gr/rande.html";
+    )
+    private String cloudwatchEndpoint = "";
 
     @FieldDoc(
         required = false,
diff --git 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
index fb8eedff82f..d8e4e4bab85 100644
--- 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
+++ 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSink.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pulsar.io.kinesis;
 
-import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.util.concurrent.Futures.addCallback;
 import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
@@ -49,7 +48,6 @@ import org.apache.pulsar.client.api.schema.GenericObject;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.io.aws.AbstractAwsConnector;
 import org.apache.pulsar.io.aws.AwsCredentialProviderPlugin;
-import org.apache.pulsar.io.common.IOConfigUtils;
 import org.apache.pulsar.io.core.Sink;
 import org.apache.pulsar.io.core.SinkContext;
 import org.apache.pulsar.io.core.annotations.Connector;
@@ -155,17 +153,16 @@ public class KinesisSink extends AbstractAwsConnector 
implements Sink<GenericObj
     @Override
     public void open(Map<String, Object> config, SinkContext sinkContext) {
         scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
-        kinesisSinkConfig = IOConfigUtils.loadWithSecrets(config, 
KinesisSinkConfig.class, sinkContext);
+        kinesisSinkConfig = KinesisSinkConfig.load(config, sinkContext);
         this.sinkContext = sinkContext;
 
-        checkArgument(isNotBlank(kinesisSinkConfig.getAwsKinesisStreamName()), 
"empty kinesis-stream name");
-        checkArgument(isNotBlank(kinesisSinkConfig.getAwsEndpoint())
-                        || isNotBlank(kinesisSinkConfig.getAwsRegion()),
-                      "Either the aws-end-point or aws-region must be set");
-        
checkArgument(isNotBlank(kinesisSinkConfig.getAwsCredentialPluginParam()), 
"empty aws-credential param");
-
         KinesisProducerConfiguration kinesisConfig = new 
KinesisProducerConfiguration();
-        kinesisConfig.setKinesisEndpoint(kinesisSinkConfig.getAwsEndpoint());
+        if (isNotBlank(kinesisSinkConfig.getAwsEndpoint())) {
+            
kinesisConfig.setKinesisEndpoint(kinesisSinkConfig.getAwsEndpoint());
+        }
+        if (isNotBlank(kinesisSinkConfig.getCloudwatchEndpoint())) {
+            
kinesisConfig.setCloudwatchEndpoint(kinesisSinkConfig.getCloudwatchEndpoint());
+        }
         if (kinesisSinkConfig.getAwsEndpointPort() != null) {
             
kinesisConfig.setKinesisPort(kinesisSinkConfig.getAwsEndpointPort());
         }
diff --git 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
index c5b26a26d0c..f81fd32134b 100644
--- 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
+++ 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSinkConfig.java
@@ -18,13 +18,14 @@
  */
 package org.apache.pulsar.io.kinesis;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
-import java.io.File;
-import java.io.IOException;
+import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import java.io.Serializable;
+import java.util.Map;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
+import org.apache.pulsar.io.common.IOConfigUtils;
+import org.apache.pulsar.io.core.SinkContext;
 import org.apache.pulsar.io.core.annotations.FieldDoc;
 
 @Data
@@ -103,9 +104,12 @@ public class KinesisSinkConfig extends BaseKinesisConfig 
implements Serializable
             help = "The maximum delay(in milliseconds) between retries.")
     private long retryMaxDelayInMillis = 60000;
 
-    public static KinesisSinkConfig load(String yamlFile) throws IOException {
-        ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
-        return mapper.readValue(new File(yamlFile), KinesisSinkConfig.class);
+    public static KinesisSinkConfig load(Map<String, Object> config, 
SinkContext sinkContext) {
+        KinesisSinkConfig kinesisSinkConfig = 
IOConfigUtils.loadWithSecrets(config, KinesisSinkConfig.class, sinkContext);
+        checkArgument(isNotBlank(kinesisSinkConfig.getAwsRegion())
+                        || (isNotBlank(kinesisSinkConfig.getAwsEndpoint()) && 
isNotBlank(kinesisSinkConfig.getCloudwatchEndpoint())),
+                "Either \"awsRegion\" must be set OR all of [\"awsEndpoint\", 
\"cloudwatchEndpoint\"] must be set.");
+        return kinesisSinkConfig;
     }
 
     public enum MessageFormat {
diff --git 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSource.java
 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSource.java
index 2412244e1b5..279368db2a0 100644
--- 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSource.java
+++ 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSource.java
@@ -18,8 +18,6 @@
  */
 package org.apache.pulsar.io.kinesis;
 
-import static com.google.common.base.Preconditions.checkArgument;
-import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import java.net.InetAddress;
 import java.util.Map;
 import java.util.UUID;
@@ -27,14 +25,12 @@ import java.util.concurrent.LinkedBlockingQueue;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.io.aws.AbstractAwsConnector;
 import org.apache.pulsar.io.aws.AwsCredentialProviderPlugin;
-import org.apache.pulsar.io.common.IOConfigUtils;
 import org.apache.pulsar.io.core.Source;
 import org.apache.pulsar.io.core.SourceContext;
 import org.apache.pulsar.io.core.annotations.Connector;
 import org.apache.pulsar.io.core.annotations.IOType;
 import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
 import software.amazon.kinesis.common.ConfigsBuilder;
-import software.amazon.kinesis.common.InitialPositionInStream;
 import software.amazon.kinesis.coordinator.Scheduler;
 import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
 import software.amazon.kinesis.retrieval.RetrievalConfig;
@@ -68,18 +64,7 @@ public class KinesisSource extends AbstractAwsConnector 
implements Source<byte[]
 
     @Override
     public void open(Map<String, Object> config, SourceContext sourceContext) 
throws Exception {
-        this.kinesisSourceConfig = IOConfigUtils.loadWithSecrets(config, 
KinesisSourceConfig.class, sourceContext);
-
-        
checkArgument(isNotBlank(kinesisSourceConfig.getAwsKinesisStreamName()), "empty 
kinesis-stream name");
-        checkArgument(isNotBlank(kinesisSourceConfig.getAwsEndpoint())
-                        || isNotBlank(kinesisSourceConfig.getAwsRegion()),
-                     "Either the aws-end-point or aws-region must be set");
-        
checkArgument(isNotBlank(kinesisSourceConfig.getAwsCredentialPluginParam()), 
"empty aws-credential param");
-
-        if (kinesisSourceConfig.getInitialPositionInStream() == 
InitialPositionInStream.AT_TIMESTAMP) {
-            checkArgument((kinesisSourceConfig.getStartAtTime() != null), 
"Timestamp must be specified");
-        }
-
+        this.kinesisSourceConfig = KinesisSourceConfig.load(config, 
sourceContext);
         queue = new 
LinkedBlockingQueue<>(kinesisSourceConfig.getReceiveQueueSize());
         workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + 
UUID.randomUUID();
 
diff --git 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSourceConfig.java
 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSourceConfig.java
index f0bf7cfc978..0dd9bfce9e0 100644
--- 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSourceConfig.java
+++ 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/KinesisSourceConfig.java
@@ -18,16 +18,17 @@
  */
 package org.apache.pulsar.io.kinesis;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
-import java.io.File;
-import java.io.IOException;
+import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import java.io.Serializable;
 import java.net.URI;
 import java.util.Date;
+import java.util.Map;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import org.apache.pulsar.io.aws.AwsCredentialProviderPlugin;
+import org.apache.pulsar.io.common.IOConfigUtils;
+import org.apache.pulsar.io.core.SourceContext;
 import org.apache.pulsar.io.core.annotations.FieldDoc;
 import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
 import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClientBuilder;
@@ -76,7 +77,7 @@ public class KinesisSourceConfig extends BaseKinesisConfig 
implements Serializab
 
     @FieldDoc(
         required = false,
-        defaultValue = "Apache Pulsar IO Connector",
+        defaultValue = "pulsar-kinesis",
         help = "Name of the Amazon Kinesis application. By default the 
application name is included "
                 + "in the user agent string used to make AWS requests. This 
can assist with troubleshooting "
                 + "(e.g. distinguish requests made by separate connectors 
instances)."
@@ -122,13 +123,6 @@ public class KinesisSourceConfig extends BaseKinesisConfig 
implements Serializab
     )
     private String dynamoEndpoint = "";
 
-    @FieldDoc(
-        required = false,
-        defaultValue = "",
-        help = "Cloudwatch end-point url. It can be found at 
https://docs.aws.amazon.com/general/latest/gr/rande.html";
-    )
-    private String cloudwatchEndpoint = "";
-
     @FieldDoc(
         required = false,
         defaultValue = "true",
@@ -136,10 +130,20 @@ public class KinesisSourceConfig extends 
BaseKinesisConfig implements Serializab
     )
     private boolean useEnhancedFanOut = true;
 
-
-    public static KinesisSourceConfig load(String yamlFile) throws IOException 
{
-        ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
-        return mapper.readValue(new File(yamlFile), KinesisSourceConfig.class);
+    public static KinesisSourceConfig load(Map<String, Object> config, 
SourceContext sourceContext) {
+        KinesisSourceConfig kinesisSourceConfig = 
IOConfigUtils.loadWithSecrets(config,
+                KinesisSourceConfig.class, sourceContext);
+        boolean isNotBlankEndpoint = 
isNotBlank(kinesisSourceConfig.getAwsEndpoint())
+                && isNotBlank(kinesisSourceConfig.getCloudwatchEndpoint())
+                && isNotBlank(kinesisSourceConfig.getDynamoEndpoint());
+        checkArgument(isNotBlank(kinesisSourceConfig.getAwsRegion()) || 
isNotBlankEndpoint,
+                "Either \"awsRegion\" must be set OR all of "
+                        + "[ \"awsEndpoint\", \"cloudwatchEndpoint\", and 
\"dynamoEndpoint\" ] must be set.");
+        if (kinesisSourceConfig.getInitialPositionInStream() == 
InitialPositionInStream.AT_TIMESTAMP) {
+            checkArgument((kinesisSourceConfig.getStartAtTime() != null),
+                    "When initialPositionInStream is AT_TIMESTAMP, startAtTime 
must be specified");
+        }
+        return kinesisSourceConfig;
     }
 
     public KinesisAsyncClient 
buildKinesisAsyncClient(AwsCredentialProviderPlugin credPlugin) {
diff --git 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/STSAssumeRoleProviderPlugin.java
 
b/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/STSAssumeRoleProviderPlugin.java
deleted file mode 100644
index e305c9c9b9f..00000000000
--- 
a/pulsar-io/kinesis/src/main/java/org/apache/pulsar/io/kinesis/STSAssumeRoleProviderPlugin.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.io.kinesis;
-
-/**
- * This is a stub class for backwards compatibility.  In new code and 
configurations, please use the plugins
- * from org.apache.pulsar.io.aws
- *
- * @see org.apache.pulsar.io.aws.STSAssumeRoleProviderPlugin
- */
-@Deprecated
-public class STSAssumeRoleProviderPlugin extends 
org.apache.pulsar.io.aws.STSAssumeRoleProviderPlugin
-        implements AwsCredentialProviderPlugin {
-}
-
diff --git 
a/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/KinesisSinkConfigTests.java
 
b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/KinesisSinkConfigTests.java
index 6f76d9e69a2..a5051927ace 100644
--- 
a/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/KinesisSinkConfigTests.java
+++ 
b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/KinesisSinkConfigTests.java
@@ -21,34 +21,16 @@ package org.apache.pulsar.io.kinesis;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 
-import java.io.File;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.pulsar.io.common.IOConfigUtils;
 import org.apache.pulsar.io.core.SinkContext;
-import org.apache.pulsar.io.kinesis.KinesisSinkConfig.MessageFormat;
 import org.mockito.Mockito;
 import org.testng.annotations.Test;
 
 public class KinesisSinkConfigTests {
 
-    @Test
-    public final void loadFromYamlFileTest() throws IOException {
-        File yamlFile = getFile("sinkConfig.yaml");
-        KinesisSinkConfig config = 
KinesisSinkConfig.load(yamlFile.getAbsolutePath());
-
-        assertNotNull(config);
-        assertEquals(config.getAwsEndpoint(), "https://some.endpoint.aws";);
-        assertEquals(config.getAwsRegion(), "us-east-1");
-        assertEquals(config.getAwsKinesisStreamName(), "my-stream");
-        assertEquals(config.getAwsCredentialPluginParam(),
-                "{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}");
-        assertEquals(config.getMessageFormat(), 
MessageFormat.ONLY_RAW_PAYLOAD);
-        assertEquals(true, config.isRetainOrdering());
-    }
-
     @Test
     public final void loadFromMapTest() throws IOException {
         Map<String, Object> map = new HashMap<String, Object> ();
@@ -58,7 +40,7 @@ public class KinesisSinkConfigTests {
         map.put("awsCredentialPluginParam", 
"{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}");
 
         SinkContext sinkContext = Mockito.mock(SinkContext.class);
-        KinesisSinkConfig config = IOConfigUtils.loadWithSecrets(map, 
KinesisSinkConfig.class, sinkContext);
+        KinesisSinkConfig config = KinesisSinkConfig.load(map, sinkContext);
 
         assertNotNull(config);
         assertEquals(config.getAwsEndpoint(), "https://some.endpoint.aws";);
@@ -78,7 +60,7 @@ public class KinesisSinkConfigTests {
         SinkContext sinkContext = Mockito.mock(SinkContext.class);
         Mockito.when(sinkContext.getSecret("awsCredentialPluginParam"))
                 
.thenReturn("{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}");
-        KinesisSinkConfig config = IOConfigUtils.loadWithSecrets(map, 
KinesisSinkConfig.class, sinkContext);
+        KinesisSinkConfig config = KinesisSinkConfig.load(map, sinkContext);
 
         assertNotNull(config);
         assertEquals(config.getAwsEndpoint(), "https://some.endpoint.aws";);
@@ -88,8 +70,13 @@ public class KinesisSinkConfigTests {
                 "{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}");
     }
 
-    private File getFile(String name) {
-        ClassLoader classLoader = getClass().getClassLoader();
-        return new File(classLoader.getResource(name).getFile());
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public final void missCloudWatchEndpointTest() {
+        Map<String, Object> map = new HashMap<String, Object> ();
+        map.put("awsEndpoint", "https://some.endpoint.aws";);
+        map.put("awsKinesisStreamName", "my-stream");
+        map.put("awsCredentialPluginParam", 
"{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}");
+        SinkContext sinkContext = Mockito.mock(SinkContext.class);
+        KinesisSinkConfig.load(map, sinkContext); 
     }
 }
diff --git 
a/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/KinesisSourceConfigTests.java
 
b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/KinesisSourceConfigTests.java
index f6b0666d34b..4ba3593b1d9 100644
--- 
a/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/KinesisSourceConfigTests.java
+++ 
b/pulsar-io/kinesis/src/test/java/org/apache/pulsar/io/kinesis/KinesisSourceConfigTests.java
@@ -21,7 +21,6 @@ package org.apache.pulsar.io.kinesis;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
 
-import java.io.File;
 import java.io.IOException;
 import java.time.ZoneOffset;
 import java.time.ZonedDateTime;
@@ -30,7 +29,6 @@ import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.pulsar.io.common.IOConfigUtils;
 import org.apache.pulsar.io.core.SourceContext;
 import org.mockito.Mockito;
 import org.testng.annotations.Test;
@@ -54,30 +52,6 @@ public class KinesisSourceConfigTests {
         DAY = then.getTime();
     }
 
-    @Test
-    public final void loadFromYamlFileTest() throws IOException {
-        File yamlFile = getFile("sourceConfig.yaml");
-        KinesisSourceConfig config = 
KinesisSourceConfig.load(yamlFile.getAbsolutePath());
-        assertNotNull(config);
-        assertEquals(config.getAwsEndpoint(), "https://some.endpoint.aws";);
-        assertEquals(config.getAwsRegion(), "us-east-1");
-        assertEquals(config.getAwsKinesisStreamName(), "my-stream");
-        assertEquals(config.getAwsCredentialPluginParam(),
-                "{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}");
-        assertEquals(config.getApplicationName(), "My test application");
-        assertEquals(config.getCheckpointInterval(), 30000);
-        assertEquals(config.getBackoffTime(), 4000);
-        assertEquals(config.getNumRetries(), 3);
-        assertEquals(config.getReceiveQueueSize(), 2000);
-        assertEquals(config.getInitialPositionInStream(), 
InitialPositionInStream.TRIM_HORIZON);
-
-        Calendar cal = Calendar.getInstance();
-        cal.setTime(config.getStartAtTime());
-        ZonedDateTime actual = ZonedDateTime.ofInstant(cal.toInstant(), 
ZoneOffset.UTC);
-        ZonedDateTime expected = ZonedDateTime.ofInstant(DAY.toInstant(), 
ZoneOffset.UTC);
-        assertEquals(actual, expected);
-    }
-
     @Test
     public final void loadFromMapTest() throws IOException {
         Map<String, Object> map = new HashMap<String, Object> ();
@@ -89,12 +63,11 @@ public class KinesisSourceConfigTests {
         map.put("backoffTime", "4000");
         map.put("numRetries", "3");
         map.put("receiveQueueSize", 2000);
-        map.put("applicationName", "My test application");
         map.put("initialPositionInStream", 
InitialPositionInStream.TRIM_HORIZON);
         map.put("startAtTime", DAY);
 
         SourceContext sourceContext = Mockito.mock(SourceContext.class);
-        KinesisSourceConfig config = IOConfigUtils.loadWithSecrets(map, 
KinesisSourceConfig.class, sourceContext);
+        KinesisSourceConfig config = KinesisSourceConfig.load(map, 
sourceContext);
 
         assertNotNull(config);
         assertEquals(config.getAwsEndpoint(), "https://some.endpoint.aws";);
@@ -102,7 +75,7 @@ public class KinesisSourceConfigTests {
         assertEquals(config.getAwsKinesisStreamName(), "my-stream");
         assertEquals(config.getAwsCredentialPluginParam(),
                 "{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}");
-        assertEquals(config.getApplicationName(), "My test application");
+        assertEquals(config.getApplicationName(), "pulsar-kinesis");
         assertEquals(config.getCheckpointInterval(), 30000);
         assertEquals(config.getBackoffTime(), 4000);
         assertEquals(config.getNumRetries(), 3);
@@ -133,7 +106,7 @@ public class KinesisSourceConfigTests {
         SourceContext sourceContext = Mockito.mock(SourceContext.class);
         Mockito.when(sourceContext.getSecret("awsCredentialPluginParam"))
                 
.thenReturn("{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}");
-        KinesisSourceConfig config = IOConfigUtils.loadWithSecrets(map, 
KinesisSourceConfig.class, sourceContext);
+        KinesisSourceConfig config = KinesisSourceConfig.load(map, 
sourceContext);
 
         assertNotNull(config);
         assertEquals(config.getAwsEndpoint(), "https://some.endpoint.aws";);
@@ -156,19 +129,17 @@ public class KinesisSourceConfigTests {
     }
 
     @Test(expectedExceptions = IllegalArgumentException.class,
-            expectedExceptionsMessageRegExp = "empty aws-credential param")
+            expectedExceptionsMessageRegExp = "awsCredentialPluginParam cannot 
be null")
     public final void missingCredentialsTest() throws Exception {
         Map<String, Object> map = new HashMap<String, Object> ();
         map.put("awsEndpoint", "https://some.endpoint.aws";);
         map.put("awsRegion", "us-east-1");
         map.put("awsKinesisStreamName", "my-stream");
-
-        KinesisSource source = new KinesisSource();
-        source.open(map, null);
+        KinesisSourceConfig.load(map, Mockito.mock(SourceContext.class));
     }
 
     @Test(expectedExceptions = IllegalArgumentException.class,
-            expectedExceptionsMessageRegExp = "Timestamp must be specified")
+            expectedExceptionsMessageRegExp = "When initialPositionInStream is 
AT_TIMESTAMP, startAtTime must be specified")
     public final void missingStartTimeTest() throws Exception {
         Map<String, Object> map = new HashMap<String, Object> ();
         map.put("awsEndpoint", "https://some.endpoint.aws";);
@@ -177,13 +148,16 @@ public class KinesisSourceConfigTests {
         map.put("awsCredentialPluginParam",
                 "{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}");
         map.put("initialPositionInStream", 
InitialPositionInStream.AT_TIMESTAMP);
-
-        KinesisSource source = new KinesisSource();
-        source.open(map, null);
+        KinesisSourceConfig.load(map, Mockito.mock(SourceContext.class));
     }
 
-    private File getFile(String name) {
-        ClassLoader classLoader = getClass().getClassLoader();
-        return new File(classLoader.getResource(name).getFile());
+    @Test(expectedExceptions = IllegalArgumentException.class)
+    public final void missCloudWatchEndpointTest() {
+        Map<String, Object> map = new HashMap<String, Object> ();
+        map.put("awsEndpoint", "https://some.endpoint.aws";);
+        map.put("awsKinesisStreamName", "my-stream");
+        map.put("awsCredentialPluginParam",
+                "{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}");
+        KinesisSourceConfig.load(map, Mockito.mock(SourceContext.class));
     }
 }
diff --git a/pulsar-io/kinesis/src/test/resources/sinkConfig.yaml 
b/pulsar-io/kinesis/src/test/resources/sinkConfig.yaml
deleted file mode 100644
index 7d99db65d07..00000000000
--- a/pulsar-io/kinesis/src/test/resources/sinkConfig.yaml
+++ /dev/null
@@ -1,27 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-{
-  "awsEndpoint" : "https://some.endpoint.aws";,
-  "awsRegion": "us-east-1",
-  "awsKinesisStreamName": "my-stream",
-  "awsCredentialPluginParam": 
"{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}",
-  "messageFormat": "ONLY_RAW_PAYLOAD",
-  "retainOrdering": "true"
-}
\ No newline at end of file
diff --git a/pulsar-io/kinesis/src/test/resources/sourceConfig.yaml 
b/pulsar-io/kinesis/src/test/resources/sourceConfig.yaml
deleted file mode 100644
index 64b564486c1..00000000000
--- a/pulsar-io/kinesis/src/test/resources/sourceConfig.yaml
+++ /dev/null
@@ -1,32 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-{
-  "awsEndpoint" : "https://some.endpoint.aws";,
-  "awsRegion": "us-east-1",
-  "awsKinesisStreamName": "my-stream",
-  "awsCredentialPluginParam": 
"{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}",
-  "applicationName": "My test application",
-  "checkpointInterval": "30000",
-  "backoffTime":"4000",
-  "numRetries":"3",
-  "receiveQueueSize": 2000,
-  "initialPositionInStream": "TRIM_HORIZON",
-  "startAtTime": "2019-03-05T19:28:58.000Z"
-}
\ No newline at end of file

Reply via email to