This is an automated email from the ASF dual-hosted git repository.
siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 4bae48b Re-enable kinesis test (#7960)
4bae48b is described below
commit 4bae48b091b9d7e5ed03755f72cd1854d62f8d4b
Author: Kartik Khare <[email protected]>
AuthorDate: Fri Jan 7 10:19:41 2022 +0530
Re-enable kinesis test (#7960)
* Enable kinesis test
* Upgrade localstack version and change kinesis localstack port
* Adding hostname provider and old localstack docker version
* Fix linting: reduce line length
* Rollback to 0.12.15 localstack version
* remove unused import
* Revert to default port
---
pinot-integration-tests/pom.xml | 2 +-
.../tests/RealtimeKinesisIntegrationTest.java | 48 +++++++++++-----------
2 files changed, 24 insertions(+), 26 deletions(-)
diff --git a/pinot-integration-tests/pom.xml b/pinot-integration-tests/pom.xml
index 217d90a..3af1214 100644
--- a/pinot-integration-tests/pom.xml
+++ b/pinot-integration-tests/pom.xml
@@ -34,7 +34,7 @@
<properties>
<pinot.root>${basedir}/..</pinot.root>
- <localstack-utils.version>0.2.15</localstack-utils.version>
+ <localstack-utils.version>0.2.19</localstack-utils.version>
<awaitility.version>3.0.0</awaitility.version>
<aws.sdk.version>2.14.28</aws.sdk.version>
</properties>
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeKinesisIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeKinesisIntegrationTest.java
index ed9ec88..dc93ca2 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeKinesisIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeKinesisIntegrationTest.java
@@ -19,6 +19,7 @@
package org.apache.pinot.integration.tests;
import cloud.localstack.Localstack;
+import cloud.localstack.ServiceName;
import cloud.localstack.docker.annotation.LocalstackDockerAnnotationProcessor;
import cloud.localstack.docker.annotation.LocalstackDockerConfiguration;
import cloud.localstack.docker.annotation.LocalstackDockerProperties;
@@ -83,8 +84,7 @@ import
software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
import software.amazon.awssdk.utils.AttributeMap;
-@LocalstackDockerProperties(services = {"kinesis"})
-@Test(enabled = false)
+@LocalstackDockerProperties(services = {ServiceName.KINESIS}, imageTag =
"0.12.15")
public class RealtimeKinesisIntegrationTest extends
BaseClusterIntegrationTestSet {
private static final Logger LOGGER =
LoggerFactory.getLogger(RealtimeKinesisIntegrationTest.class);
@@ -189,22 +189,21 @@ public class RealtimeKinesisIntegrationTest extends
BaseClusterIntegrationTestSe
String streamType = "kinesis";
streamConfigMap.put(StreamConfigProperties.STREAM_TYPE, streamType);
- streamConfigMap
- .put(StreamConfigProperties.constructStreamProperty(STREAM_TYPE,
StreamConfigProperties.STREAM_TOPIC_NAME),
- STREAM_NAME);
+ streamConfigMap.put(
+ StreamConfigProperties.constructStreamProperty(STREAM_TYPE,
StreamConfigProperties.STREAM_TOPIC_NAME),
+ STREAM_NAME);
streamConfigMap.put(
StreamConfigProperties.constructStreamProperty(STREAM_TYPE,
StreamConfigProperties.STREAM_FETCH_TIMEOUT_MILLIS),
"30000");
- streamConfigMap
- .put(StreamConfigProperties.constructStreamProperty(STREAM_TYPE,
StreamConfigProperties.STREAM_CONSUMER_TYPES),
- StreamConfig.ConsumerType.LOWLEVEL.toString());
- streamConfigMap.put(StreamConfigProperties
- .constructStreamProperty(STREAM_TYPE,
StreamConfigProperties.STREAM_CONSUMER_FACTORY_CLASS),
- KinesisConsumerFactory.class.getName());
- streamConfigMap
- .put(StreamConfigProperties.constructStreamProperty(STREAM_TYPE,
StreamConfigProperties.STREAM_DECODER_CLASS),
- "org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder");
+ streamConfigMap.put(
+ StreamConfigProperties.constructStreamProperty(STREAM_TYPE,
StreamConfigProperties.STREAM_CONSUMER_TYPES),
+ StreamConfig.ConsumerType.LOWLEVEL.toString());
+
streamConfigMap.put(StreamConfigProperties.constructStreamProperty(STREAM_TYPE,
+ StreamConfigProperties.STREAM_CONSUMER_FACTORY_CLASS),
KinesisConsumerFactory.class.getName());
+ streamConfigMap.put(
+ StreamConfigProperties.constructStreamProperty(STREAM_TYPE,
StreamConfigProperties.STREAM_DECODER_CLASS),
+ "org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder");
streamConfigMap.put(KinesisConfig.REGION, REGION);
streamConfigMap.put(KinesisConfig.MAX_RECORDS_TO_FETCH,
String.valueOf(MAX_RECORDS_TO_FETCH));
streamConfigMap.put(KinesisConfig.SHARD_ITERATOR_TYPE,
ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString());
@@ -212,8 +211,8 @@ public class RealtimeKinesisIntegrationTest extends
BaseClusterIntegrationTestSe
streamConfigMap.put(KinesisConfig.ACCESS_KEY,
getLocalAWSCredentials().resolveCredentials().accessKeyId());
streamConfigMap.put(KinesisConfig.SECRET_KEY,
getLocalAWSCredentials().resolveCredentials().secretAccessKey());
streamConfigMap.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS,
Integer.toString(200));
- streamConfigMap.put(StreamConfigProperties
- .constructStreamProperty(streamType,
StreamConfigProperties.STREAM_CONSUMER_OFFSET_CRITERIA), "smallest");
+
streamConfigMap.put(StreamConfigProperties.constructStreamProperty(streamType,
+ StreamConfigProperties.STREAM_CONSUMER_OFFSET_CRITERIA), "smallest");
return streamConfigMap;
}
@@ -225,8 +224,8 @@ public class RealtimeKinesisIntegrationTest extends
BaseClusterIntegrationTestSe
_localstackDocker.startup(dockerConfig);
_kinesisClient = KinesisClient.builder().httpClient(new
ApacheSdkHttpService().createHttpClientBuilder()
- .buildWithDefaults(
-
AttributeMap.builder().put(SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES,
Boolean.TRUE).build()))
+ .buildWithDefaults(
+
AttributeMap.builder().put(SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES,
Boolean.TRUE).build()))
.credentialsProvider(getLocalAWSCredentials()).region(Region.of(REGION))
.endpointOverride(new URI(LOCALSTACK_KINESIS_ENDPOINT)).build();
@@ -278,8 +277,8 @@ public class RealtimeKinesisIntegrationTest extends
BaseClusterIntegrationTestSe
.partitionKey(data.get("Origin").textValue()).build();
PutRecordResponse putRecordResponse =
_kinesisClient.putRecord(putRecordRequest);
if (putRecordResponse.sdkHttpResponse().statusCode() == 200) {
- if (StringUtils.isNotBlank(putRecordResponse.sequenceNumber()) &&
StringUtils
- .isNotBlank(putRecordResponse.shardId())) {
+ if (StringUtils.isNotBlank(putRecordResponse.sequenceNumber()) &&
StringUtils.isNotBlank(
+ putRecordResponse.shardId())) {
_totalRecordsPushedInStream++;
int fieldIndex = 1;
@@ -321,9 +320,8 @@ public class RealtimeKinesisIntegrationTest extends
BaseClusterIntegrationTestSe
throws Exception {
Assert.assertNotEquals(_totalRecordsPushedInStream, 0);
- ResultSet pinotResultSet = getPinotConnection()
- .execute(new Request("sql", "SELECT * FROM " + getTableName() + "
ORDER BY Origin LIMIT 10000"))
- .getResultSet(0);
+ ResultSet pinotResultSet = getPinotConnection().execute(
+ new Request("sql", "SELECT * FROM " + getTableName() + " ORDER BY
Origin LIMIT 10000")).getResultSet(0);
Assert.assertNotEquals(pinotResultSet.getRowCount(), 0);
@@ -440,8 +438,8 @@ public class RealtimeKinesisIntegrationTest extends
BaseClusterIntegrationTestSe
}
}
- _h2Connection.prepareCall("CREATE TABLE " + getTableName() + "(" +
StringUtil
- .join(",", _h2FieldNameAndTypes.toArray(new
String[_h2FieldNameAndTypes.size()])) + ")").execute();
+ _h2Connection.prepareCall("CREATE TABLE " + getTableName() + "(" +
StringUtil.join(",",
+ _h2FieldNameAndTypes.toArray(new String[_h2FieldNameAndTypes.size()]))
+ ")").execute();
}
@AfterClass(enabled = false)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]