CrynetLogistics commented on a change in pull request #18553:
URL: https://github.com/apache/flink/pull/18553#discussion_r802714499
##########
File path:
flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/KinesisFirehoseSinkITCase.java
##########
@@ -78,42 +76,36 @@
@Before
public void setup() throws Exception {
System.setProperty(SdkSystemSetting.CBOR_ENABLED.property(), "false");
- httpClient =
AWSServicesTestUtils.createHttpClient(mockFirehoseContainer.getEndpoint());
- s3AsyncClient = createS3Client(mockFirehoseContainer.getEndpoint(),
httpClient);
- firehoseAsyncClient =
getFirehoseClient(mockFirehoseContainer.getEndpoint(), httpClient);
- iamAsyncClient = createIamClient(mockFirehoseContainer.getEndpoint(),
httpClient);
+ s3HttpClient = createHttpClient(mockFirehoseContainer.getEndpoint());
+ firehoseHttpClient =
createHttpClient(mockFirehoseContainer.getEndpoint());
+ iamHttpClient = createHttpClient(mockFirehoseContainer.getEndpoint());
+ s3AsyncClient = createS3Client(mockFirehoseContainer.getEndpoint(),
s3HttpClient);
+ firehoseAsyncClient =
+ getFirehoseClient(mockFirehoseContainer.getEndpoint(),
firehoseHttpClient);
+ iamAsyncClient = createIamClient(mockFirehoseContainer.getEndpoint(),
iamHttpClient);
+ env = StreamExecutionEnvironment.getExecutionEnvironment();
}
@After
public void teardown() {
System.clearProperty(SdkSystemSetting.CBOR_ENABLED.property());
- AWSGeneralUtil.closeResources(
- httpClient, s3AsyncClient, firehoseAsyncClient,
iamAsyncClient);
+ s3HttpClient.close();
+ firehoseHttpClient.close();
+ iamHttpClient.close();
+ s3AsyncClient.close();
+ firehoseAsyncClient.close();
+ iamAsyncClient.close();
Review comment:
4 hours ago intended to. I've reassessed and do not plan to implement
due to lack of time.
##########
File path:
flink-connectors/flink-connector-aws-kinesis-firehose/src/test/java/org/apache/flink/connector/firehose/sink/testutils/KinesisFirehoseTestUtils.java
##########
@@ -73,4 +81,22 @@ public static void createDeliveryStream(
firehoseAsyncClient.createDeliveryStream(request);
deliveryStream.get();
}
+
+ public static DataStream<String> getSampleDataGenerator(
+ StreamExecutionEnvironment env, int endValue) {
+ ObjectMapper mapper = new ObjectMapper();
+ return env.fromSequence(1, endValue)
+ .map(Object::toString)
+ .returns(String.class)
+ .map(data -> mapper.writeValueAsString(ImmutableMap.of("data",
data)));
+ }
+
+ public static List<String> getSampleDataGenerated(int endValue) throws
JsonProcessingException {
Review comment:
4 hours ago intended to. I've reassessed and do not plan to implement
due to lack of time.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]