This is an automated email from the ASF dual-hosted git repository.
dannycranmer pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 9ee91ef [FLINK-21875][hotfix] Fix compilation error in IntelliJ.
Removing reliance on shaded classes. (#15290)
9ee91ef is described below
commit 9ee91efa94c67f797b39a967af9671a1df3381b4
Author: Danny Cranmer <[email protected]>
AuthorDate: Fri Mar 19 21:32:13 2021 +0000
[FLINK-21875][hotfix] Fix compilation error in IntelliJ. Removing reliance
on shaded classes. (#15290)
---
.../kinesis/testutils/KinesisPubsubClient.java | 16 ++-
.../registry/test/GSRKinesisPubsubClient.java | 107 +++------------------
2 files changed, 26 insertions(+), 97 deletions(-)
diff --git
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesisPubsubClient.java
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesisPubsubClient.java
index aa8782c..0d15656 100644
---
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesisPubsubClient.java
+++
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesisPubsubClient.java
@@ -45,6 +45,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.function.Function;
/**
* Simple client to publish and retrieve messages, using the AWS Kinesis SDK
and the Flink Kinesis
@@ -87,15 +88,24 @@ public class KinesisPubsubClient {
}
public void sendMessage(String topic, String msg) {
+ sendMessage(topic, msg.getBytes());
+ }
+
+ public void sendMessage(String topic, byte[] data) {
PutRecordRequest putRecordRequest = new PutRecordRequest();
putRecordRequest.setStreamName(topic);
putRecordRequest.setPartitionKey("fakePartitionKey");
- putRecordRequest.withData(ByteBuffer.wrap(msg.getBytes()));
+ putRecordRequest.withData(ByteBuffer.wrap(data));
PutRecordResult putRecordResult =
kinesisClient.putRecord(putRecordRequest);
LOG.info("added record: {}", putRecordResult.getSequenceNumber());
}
public List<String> readAllMessages(String streamName) throws Exception {
+ return readAllMessages(streamName, String::new);
+ }
+
+ public <T> List<T> readAllMessages(String streamName, Function<byte[], T>
deserialiser)
+ throws Exception {
KinesisProxyInterface kinesisProxy = KinesisProxy.create(properties);
Map<String, String> streamNamesWithLastSeenShardIds = new HashMap<>();
streamNamesWithLastSeenShardIds.put(streamName, null);
@@ -104,7 +114,7 @@ public class KinesisPubsubClient {
kinesisProxy.getShardList(streamNamesWithLastSeenShardIds);
int maxRecordsToFetch = 10;
- List<String> messages = new ArrayList<>();
+ List<T> messages = new ArrayList<>();
// retrieve records from all shards
for (StreamShardHandle ssh :
shardListResult.getRetrievedShardListOfStream(streamName)) {
String shardIterator = kinesisProxy.getShardIterator(ssh,
"TRIM_HORIZON", null);
@@ -112,7 +122,7 @@ public class KinesisPubsubClient {
kinesisProxy.getRecords(shardIterator, maxRecordsToFetch);
List<Record> aggregatedRecords = getRecordsResult.getRecords();
for (Record record : aggregatedRecords) {
- messages.add(new String(record.getData().array()));
+ messages.add(deserialiser.apply(record.getData().array()));
}
}
return messages;
diff --git
a/flink-end-to-end-tests/flink-glue-schema-registry-test/src/main/java/org/apache/flink/glue/schema/registry/test/GSRKinesisPubsubClient.java
b/flink-end-to-end-tests/flink-glue-schema-registry-test/src/main/java/org/apache/flink/glue/schema/registry/test/GSRKinesisPubsubClient.java
index 980655c..aa39c43 100644
---
a/flink-end-to-end-tests/flink-glue-schema-registry-test/src/main/java/org/apache/flink/glue/schema/registry/test/GSRKinesisPubsubClient.java
+++
b/flink-end-to-end-tests/flink-glue-schema-registry-test/src/main/java/org/apache/flink/glue/schema/registry/test/GSRKinesisPubsubClient.java
@@ -17,24 +17,8 @@
package org.apache.flink.glue.schema.registry.test;
-import org.apache.flink.api.common.time.Deadline;
-import
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.GetRecordsResult;
-import
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.Record;
-import
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
-import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
-import org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult;
-import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
-import
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
+import
org.apache.flink.streaming.connectors.kinesis.testutils.KinesisPubsubClient;
-import com.amazonaws.AmazonClientException;
-import com.amazonaws.auth.AWSCredentialsProvider;
-import com.amazonaws.auth.EnvironmentVariableCredentialsProvider;
-import com.amazonaws.client.builder.AwsClientBuilder;
-import com.amazonaws.services.kinesis.AmazonKinesis;
-import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
-import com.amazonaws.services.kinesis.model.PutRecordRequest;
-import com.amazonaws.services.kinesis.model.PutRecordResult;
-import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
import com.amazonaws.services.schemaregistry.common.AWSDeserializerInput;
import com.amazonaws.services.schemaregistry.common.AWSSerializerInput;
import com.amazonaws.services.schemaregistry.deserializers.AWSDeserializer;
@@ -42,13 +26,9 @@ import
com.amazonaws.services.schemaregistry.serializers.avro.AWSAvroSerializer;
import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants;
import com.amazonaws.services.schemaregistry.utils.AvroRecordType;
import org.apache.avro.generic.GenericRecord;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import java.nio.ByteBuffer;
-import java.time.Duration;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -60,44 +40,13 @@ import java.util.UUID;
* Connectors and Glue Schema Registry classes.
*/
public class GSRKinesisPubsubClient {
- private static final Logger LOG =
LoggerFactory.getLogger(GSRKinesisPubsubClient.class);
-
- private final AmazonKinesis kinesisClient;
- private final Properties properties;
+ private final KinesisPubsubClient client;
public GSRKinesisPubsubClient(Properties properties) {
- this.kinesisClient = createClientWithCredentials(properties);
- this.properties = properties;
- }
-
- public void createStream(String stream, int shards, Properties props)
throws Exception {
- try {
- kinesisClient.describeStream(stream);
- kinesisClient.deleteStream(stream);
- } catch (ResourceNotFoundException rnfe) {
- // Exception can be ignored
- }
-
- kinesisClient.createStream(stream, shards);
- Deadline deadline = Deadline.fromNow(Duration.ofSeconds(5));
- while (deadline.hasTimeLeft()) {
- try {
- Thread.sleep(250);
- if
(kinesisClient.describeStream(stream).getStreamDescription().getShards().size()
- != shards) {
- continue;
- }
- break;
- } catch (ResourceNotFoundException rnfe) {
- // Exception can be ignored
- }
- }
+ this.client = new KinesisPubsubClient(properties);
}
public void sendMessage(String schema, String streamName, GenericRecord
msg) {
- PutRecordRequest putRecordRequest = new PutRecordRequest();
- putRecordRequest.setStreamName(streamName);
- putRecordRequest.setPartitionKey("fakePartitionKey");
UUID schemaVersionId =
createSerializer()
.registerSchema(
@@ -107,42 +56,24 @@ public class GSRKinesisPubsubClient {
.transportName(streamName)
.build());
- byte[] serializedData = createSerializer().serialize(msg,
schemaVersionId);
- putRecordRequest.withData(ByteBuffer.wrap(serializedData));
- PutRecordResult putRecordResult =
kinesisClient.putRecord(putRecordRequest);
-
- LOG.info("added record: {}", putRecordResult.getSequenceNumber());
+ client.sendMessage(streamName, createSerializer().serialize(msg,
schemaVersionId));
}
public List<Object> readAllMessages(String streamName) throws Exception {
- KinesisProxyInterface kinesisProxy = KinesisProxy.create(properties);
- Map<String, String> streamNamesWithLastSeenShardIds = new HashMap<>();
- streamNamesWithLastSeenShardIds.put(streamName, null);
-
- GetShardListResult shardListResult =
- kinesisProxy.getShardList(streamNamesWithLastSeenShardIds);
AWSDeserializer awsDeserializer = createDeserializer();
- int maxRecordsToFetch = 10;
- List<Object> messages = new ArrayList<>();
- // retrieve records from all shards
- for (StreamShardHandle ssh :
shardListResult.getRetrievedShardListOfStream(streamName)) {
- String shardIterator = kinesisProxy.getShardIterator(ssh,
"TRIM_HORIZON", null);
- GetRecordsResult getRecordsResult =
- kinesisProxy.getRecords(shardIterator, maxRecordsToFetch);
-
List<org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.Record>
- aggregatedRecords = getRecordsResult.getRecords();
- for (Record record : aggregatedRecords) {
- Object obj =
+ return client.readAllMessages(
+ streamName,
+ bytes ->
awsDeserializer.deserialize(
AWSDeserializerInput.builder()
-
.buffer(ByteBuffer.wrap(record.getData().array()))
+ .buffer(ByteBuffer.wrap(bytes))
.transportName(streamName)
- .build());
- messages.add(obj);
- }
- }
- return messages;
+ .build()));
+ }
+
+ public void createStream(String stream, int shards, Properties props)
throws Exception {
+ client.createTopic(stream, shards, props);
}
private Map<String, Object> getSerDeConfigs() {
@@ -169,16 +100,4 @@ public class GSRKinesisPubsubClient {
.credentialProvider(DefaultCredentialsProvider.builder().build())
.build();
}
-
- private static AmazonKinesis createClientWithCredentials(Properties props)
- throws AmazonClientException {
- AWSCredentialsProvider credentialsProvider = new
EnvironmentVariableCredentialsProvider();
- return AmazonKinesisClientBuilder.standard()
- .withCredentials(credentialsProvider)
- .withEndpointConfiguration(
- new AwsClientBuilder.EndpointConfiguration(
-
props.getProperty(ConsumerConfigConstants.AWS_ENDPOINT),
- "ca-central-1"))
- .build();
- }
}