This is an automated email from the ASF dual-hosted git repository.

dannycranmer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 9ee91ef  [FLINK-21875][hotfix] Fix compilation error in IntelliJ. 
Removing reliance on shaded classes. (#15290)
9ee91ef is described below

commit 9ee91efa94c67f797b39a967af9671a1df3381b4
Author: Danny Cranmer <[email protected]>
AuthorDate: Fri Mar 19 21:32:13 2021 +0000

    [FLINK-21875][hotfix] Fix compilation error in IntelliJ. Removing reliance 
on shaded classes. (#15290)
---
 .../kinesis/testutils/KinesisPubsubClient.java     |  16 ++-
 .../registry/test/GSRKinesisPubsubClient.java      | 107 +++------------------
 2 files changed, 26 insertions(+), 97 deletions(-)

diff --git 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesisPubsubClient.java
 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesisPubsubClient.java
index aa8782c..0d15656 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesisPubsubClient.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesisPubsubClient.java
@@ -45,6 +45,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.function.Function;
 
 /**
  * Simple client to publish and retrieve messages, using the AWS Kinesis SDK 
and the Flink Kinesis
@@ -87,15 +88,24 @@ public class KinesisPubsubClient {
     }
 
     public void sendMessage(String topic, String msg) {
+        sendMessage(topic, msg.getBytes());
+    }
+
+    public void sendMessage(String topic, byte[] data) {
         PutRecordRequest putRecordRequest = new PutRecordRequest();
         putRecordRequest.setStreamName(topic);
         putRecordRequest.setPartitionKey("fakePartitionKey");
-        putRecordRequest.withData(ByteBuffer.wrap(msg.getBytes()));
+        putRecordRequest.withData(ByteBuffer.wrap(data));
         PutRecordResult putRecordResult = 
kinesisClient.putRecord(putRecordRequest);
         LOG.info("added record: {}", putRecordResult.getSequenceNumber());
     }
 
     public List<String> readAllMessages(String streamName) throws Exception {
+        return readAllMessages(streamName, String::new);
+    }
+
+    public <T> List<T> readAllMessages(String streamName, Function<byte[], T> 
deserialiser)
+            throws Exception {
         KinesisProxyInterface kinesisProxy = KinesisProxy.create(properties);
         Map<String, String> streamNamesWithLastSeenShardIds = new HashMap<>();
         streamNamesWithLastSeenShardIds.put(streamName, null);
@@ -104,7 +114,7 @@ public class KinesisPubsubClient {
                 kinesisProxy.getShardList(streamNamesWithLastSeenShardIds);
         int maxRecordsToFetch = 10;
 
-        List<String> messages = new ArrayList<>();
+        List<T> messages = new ArrayList<>();
         // retrieve records from all shards
         for (StreamShardHandle ssh : 
shardListResult.getRetrievedShardListOfStream(streamName)) {
             String shardIterator = kinesisProxy.getShardIterator(ssh, 
"TRIM_HORIZON", null);
@@ -112,7 +122,7 @@ public class KinesisPubsubClient {
                     kinesisProxy.getRecords(shardIterator, maxRecordsToFetch);
             List<Record> aggregatedRecords = getRecordsResult.getRecords();
             for (Record record : aggregatedRecords) {
-                messages.add(new String(record.getData().array()));
+                messages.add(deserialiser.apply(record.getData().array()));
             }
         }
         return messages;
diff --git 
a/flink-end-to-end-tests/flink-glue-schema-registry-test/src/main/java/org/apache/flink/glue/schema/registry/test/GSRKinesisPubsubClient.java
 
b/flink-end-to-end-tests/flink-glue-schema-registry-test/src/main/java/org/apache/flink/glue/schema/registry/test/GSRKinesisPubsubClient.java
index 980655c..aa39c43 100644
--- 
a/flink-end-to-end-tests/flink-glue-schema-registry-test/src/main/java/org/apache/flink/glue/schema/registry/test/GSRKinesisPubsubClient.java
+++ 
b/flink-end-to-end-tests/flink-glue-schema-registry-test/src/main/java/org/apache/flink/glue/schema/registry/test/GSRKinesisPubsubClient.java
@@ -17,24 +17,8 @@
 
 package org.apache.flink.glue.schema.registry.test;
 
-import org.apache.flink.api.common.time.Deadline;
-import 
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.GetRecordsResult;
-import 
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.Record;
-import 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
-import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
-import org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult;
-import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
-import 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
+import 
org.apache.flink.streaming.connectors.kinesis.testutils.KinesisPubsubClient;
 
-import com.amazonaws.AmazonClientException;
-import com.amazonaws.auth.AWSCredentialsProvider;
-import com.amazonaws.auth.EnvironmentVariableCredentialsProvider;
-import com.amazonaws.client.builder.AwsClientBuilder;
-import com.amazonaws.services.kinesis.AmazonKinesis;
-import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
-import com.amazonaws.services.kinesis.model.PutRecordRequest;
-import com.amazonaws.services.kinesis.model.PutRecordResult;
-import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
 import com.amazonaws.services.schemaregistry.common.AWSDeserializerInput;
 import com.amazonaws.services.schemaregistry.common.AWSSerializerInput;
 import com.amazonaws.services.schemaregistry.deserializers.AWSDeserializer;
@@ -42,13 +26,9 @@ import 
com.amazonaws.services.schemaregistry.serializers.avro.AWSAvroSerializer;
 import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants;
 import com.amazonaws.services.schemaregistry.utils.AvroRecordType;
 import org.apache.avro.generic.GenericRecord;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
 
 import java.nio.ByteBuffer;
-import java.time.Duration;
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -60,44 +40,13 @@ import java.util.UUID;
  * Connectors and Glue Schema Registry classes.
  */
 public class GSRKinesisPubsubClient {
-    private static final Logger LOG = 
LoggerFactory.getLogger(GSRKinesisPubsubClient.class);
-
-    private final AmazonKinesis kinesisClient;
-    private final Properties properties;
+    private final KinesisPubsubClient client;
 
     public GSRKinesisPubsubClient(Properties properties) {
-        this.kinesisClient = createClientWithCredentials(properties);
-        this.properties = properties;
-    }
-
-    public void createStream(String stream, int shards, Properties props) 
throws Exception {
-        try {
-            kinesisClient.describeStream(stream);
-            kinesisClient.deleteStream(stream);
-        } catch (ResourceNotFoundException rnfe) {
-            // Exception can be ignored
-        }
-
-        kinesisClient.createStream(stream, shards);
-        Deadline deadline = Deadline.fromNow(Duration.ofSeconds(5));
-        while (deadline.hasTimeLeft()) {
-            try {
-                Thread.sleep(250);
-                if 
(kinesisClient.describeStream(stream).getStreamDescription().getShards().size()
-                        != shards) {
-                    continue;
-                }
-                break;
-            } catch (ResourceNotFoundException rnfe) {
-                // Exception can be ignored
-            }
-        }
+        this.client = new KinesisPubsubClient(properties);
     }
 
     public void sendMessage(String schema, String streamName, GenericRecord 
msg) {
-        PutRecordRequest putRecordRequest = new PutRecordRequest();
-        putRecordRequest.setStreamName(streamName);
-        putRecordRequest.setPartitionKey("fakePartitionKey");
         UUID schemaVersionId =
                 createSerializer()
                         .registerSchema(
@@ -107,42 +56,24 @@ public class GSRKinesisPubsubClient {
                                         .transportName(streamName)
                                         .build());
 
-        byte[] serializedData = createSerializer().serialize(msg, 
schemaVersionId);
-        putRecordRequest.withData(ByteBuffer.wrap(serializedData));
-        PutRecordResult putRecordResult = 
kinesisClient.putRecord(putRecordRequest);
-
-        LOG.info("added record: {}", putRecordResult.getSequenceNumber());
+        client.sendMessage(streamName, createSerializer().serialize(msg, 
schemaVersionId));
     }
 
     public List<Object> readAllMessages(String streamName) throws Exception {
-        KinesisProxyInterface kinesisProxy = KinesisProxy.create(properties);
-        Map<String, String> streamNamesWithLastSeenShardIds = new HashMap<>();
-        streamNamesWithLastSeenShardIds.put(streamName, null);
-
-        GetShardListResult shardListResult =
-                kinesisProxy.getShardList(streamNamesWithLastSeenShardIds);
         AWSDeserializer awsDeserializer = createDeserializer();
-        int maxRecordsToFetch = 10;
 
-        List<Object> messages = new ArrayList<>();
-        // retrieve records from all shards
-        for (StreamShardHandle ssh : 
shardListResult.getRetrievedShardListOfStream(streamName)) {
-            String shardIterator = kinesisProxy.getShardIterator(ssh, 
"TRIM_HORIZON", null);
-            GetRecordsResult getRecordsResult =
-                    kinesisProxy.getRecords(shardIterator, maxRecordsToFetch);
-            
List<org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.Record>
-                    aggregatedRecords = getRecordsResult.getRecords();
-            for (Record record : aggregatedRecords) {
-                Object obj =
+        return client.readAllMessages(
+                streamName,
+                bytes ->
                         awsDeserializer.deserialize(
                                 AWSDeserializerInput.builder()
-                                        
.buffer(ByteBuffer.wrap(record.getData().array()))
+                                        .buffer(ByteBuffer.wrap(bytes))
                                         .transportName(streamName)
-                                        .build());
-                messages.add(obj);
-            }
-        }
-        return messages;
+                                        .build()));
+    }
+
+    public void createStream(String stream, int shards, Properties props) 
throws Exception {
+        client.createTopic(stream, shards, props);
     }
 
     private Map<String, Object> getSerDeConfigs() {
@@ -169,16 +100,4 @@ public class GSRKinesisPubsubClient {
                 
.credentialProvider(DefaultCredentialsProvider.builder().build())
                 .build();
     }
-
-    private static AmazonKinesis createClientWithCredentials(Properties props)
-            throws AmazonClientException {
-        AWSCredentialsProvider credentialsProvider = new 
EnvironmentVariableCredentialsProvider();
-        return AmazonKinesisClientBuilder.standard()
-                .withCredentials(credentialsProvider)
-                .withEndpointConfiguration(
-                        new AwsClientBuilder.EndpointConfiguration(
-                                
props.getProperty(ConsumerConfigConstants.AWS_ENDPOINT),
-                                "ca-central-1"))
-                .build();
-    }
 }

Reply via email to