This is an automated email from the ASF dual-hosted git repository.

siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 4bae48b  Re-enable kinesis test (#7960)
4bae48b is described below

commit 4bae48b091b9d7e5ed03755f72cd1854d62f8d4b
Author: Kartik Khare <[email protected]>
AuthorDate: Fri Jan 7 10:19:41 2022 +0530

    Re-enable kinesis test (#7960)
    
    * Enable kinesis test
    
    * Upgrade localstack version and change kinesis localstack port
    
    * Adding hostname provider and old localstack docker version
    
    * Fix linting: reduce line length
    
    * Rollback to 0.12.15 localstack version
    
    * remove unused import
    
    * Revert to default port
---
 pinot-integration-tests/pom.xml                    |  2 +-
 .../tests/RealtimeKinesisIntegrationTest.java      | 48 +++++++++++-----------
 2 files changed, 24 insertions(+), 26 deletions(-)

diff --git a/pinot-integration-tests/pom.xml b/pinot-integration-tests/pom.xml
index 217d90a..3af1214 100644
--- a/pinot-integration-tests/pom.xml
+++ b/pinot-integration-tests/pom.xml
@@ -34,7 +34,7 @@
 
   <properties>
     <pinot.root>${basedir}/..</pinot.root>
-    <localstack-utils.version>0.2.15</localstack-utils.version>
+    <localstack-utils.version>0.2.19</localstack-utils.version>
     <awaitility.version>3.0.0</awaitility.version>
     <aws.sdk.version>2.14.28</aws.sdk.version>
   </properties>
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeKinesisIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeKinesisIntegrationTest.java
index ed9ec88..dc93ca2 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeKinesisIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeKinesisIntegrationTest.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.integration.tests;
 
 import cloud.localstack.Localstack;
+import cloud.localstack.ServiceName;
 import cloud.localstack.docker.annotation.LocalstackDockerAnnotationProcessor;
 import cloud.localstack.docker.annotation.LocalstackDockerConfiguration;
 import cloud.localstack.docker.annotation.LocalstackDockerProperties;
@@ -83,8 +84,7 @@ import 
software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
 import software.amazon.awssdk.utils.AttributeMap;
 
 
-@LocalstackDockerProperties(services = {"kinesis"})
-@Test(enabled = false)
+@LocalstackDockerProperties(services = {ServiceName.KINESIS}, imageTag = 
"0.12.15")
 public class RealtimeKinesisIntegrationTest extends 
BaseClusterIntegrationTestSet {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(RealtimeKinesisIntegrationTest.class);
 
@@ -189,22 +189,21 @@ public class RealtimeKinesisIntegrationTest extends 
BaseClusterIntegrationTestSe
     String streamType = "kinesis";
     streamConfigMap.put(StreamConfigProperties.STREAM_TYPE, streamType);
 
-    streamConfigMap
-        .put(StreamConfigProperties.constructStreamProperty(STREAM_TYPE, 
StreamConfigProperties.STREAM_TOPIC_NAME),
-            STREAM_NAME);
+    streamConfigMap.put(
+        StreamConfigProperties.constructStreamProperty(STREAM_TYPE, 
StreamConfigProperties.STREAM_TOPIC_NAME),
+        STREAM_NAME);
 
     streamConfigMap.put(
         StreamConfigProperties.constructStreamProperty(STREAM_TYPE, 
StreamConfigProperties.STREAM_FETCH_TIMEOUT_MILLIS),
         "30000");
-    streamConfigMap
-        .put(StreamConfigProperties.constructStreamProperty(STREAM_TYPE, 
StreamConfigProperties.STREAM_CONSUMER_TYPES),
-            StreamConfig.ConsumerType.LOWLEVEL.toString());
-    streamConfigMap.put(StreamConfigProperties
-            .constructStreamProperty(STREAM_TYPE, 
StreamConfigProperties.STREAM_CONSUMER_FACTORY_CLASS),
-        KinesisConsumerFactory.class.getName());
-    streamConfigMap
-        .put(StreamConfigProperties.constructStreamProperty(STREAM_TYPE, 
StreamConfigProperties.STREAM_DECODER_CLASS),
-            "org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder");
+    streamConfigMap.put(
+        StreamConfigProperties.constructStreamProperty(STREAM_TYPE, 
StreamConfigProperties.STREAM_CONSUMER_TYPES),
+        StreamConfig.ConsumerType.LOWLEVEL.toString());
+    
streamConfigMap.put(StreamConfigProperties.constructStreamProperty(STREAM_TYPE,
+        StreamConfigProperties.STREAM_CONSUMER_FACTORY_CLASS), 
KinesisConsumerFactory.class.getName());
+    streamConfigMap.put(
+        StreamConfigProperties.constructStreamProperty(STREAM_TYPE, 
StreamConfigProperties.STREAM_DECODER_CLASS),
+        "org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder");
     streamConfigMap.put(KinesisConfig.REGION, REGION);
     streamConfigMap.put(KinesisConfig.MAX_RECORDS_TO_FETCH, 
String.valueOf(MAX_RECORDS_TO_FETCH));
     streamConfigMap.put(KinesisConfig.SHARD_ITERATOR_TYPE, 
ShardIteratorType.AFTER_SEQUENCE_NUMBER.toString());
@@ -212,8 +211,8 @@ public class RealtimeKinesisIntegrationTest extends 
BaseClusterIntegrationTestSe
     streamConfigMap.put(KinesisConfig.ACCESS_KEY, 
getLocalAWSCredentials().resolveCredentials().accessKeyId());
     streamConfigMap.put(KinesisConfig.SECRET_KEY, 
getLocalAWSCredentials().resolveCredentials().secretAccessKey());
     streamConfigMap.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS, 
Integer.toString(200));
-    streamConfigMap.put(StreamConfigProperties
-        .constructStreamProperty(streamType, 
StreamConfigProperties.STREAM_CONSUMER_OFFSET_CRITERIA), "smallest");
+    
streamConfigMap.put(StreamConfigProperties.constructStreamProperty(streamType,
+        StreamConfigProperties.STREAM_CONSUMER_OFFSET_CRITERIA), "smallest");
     return streamConfigMap;
   }
 
@@ -225,8 +224,8 @@ public class RealtimeKinesisIntegrationTest extends 
BaseClusterIntegrationTestSe
     _localstackDocker.startup(dockerConfig);
 
     _kinesisClient = KinesisClient.builder().httpClient(new 
ApacheSdkHttpService().createHttpClientBuilder()
-        .buildWithDefaults(
-            
AttributeMap.builder().put(SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES, 
Boolean.TRUE).build()))
+            .buildWithDefaults(
+                
AttributeMap.builder().put(SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES, 
Boolean.TRUE).build()))
         
.credentialsProvider(getLocalAWSCredentials()).region(Region.of(REGION))
         .endpointOverride(new URI(LOCALSTACK_KINESIS_ENDPOINT)).build();
 
@@ -278,8 +277,8 @@ public class RealtimeKinesisIntegrationTest extends 
BaseClusterIntegrationTestSe
                   .partitionKey(data.get("Origin").textValue()).build();
           PutRecordResponse putRecordResponse = 
_kinesisClient.putRecord(putRecordRequest);
           if (putRecordResponse.sdkHttpResponse().statusCode() == 200) {
-            if (StringUtils.isNotBlank(putRecordResponse.sequenceNumber()) && 
StringUtils
-                .isNotBlank(putRecordResponse.shardId())) {
+            if (StringUtils.isNotBlank(putRecordResponse.sequenceNumber()) && 
StringUtils.isNotBlank(
+                putRecordResponse.shardId())) {
               _totalRecordsPushedInStream++;
 
               int fieldIndex = 1;
@@ -321,9 +320,8 @@ public class RealtimeKinesisIntegrationTest extends 
BaseClusterIntegrationTestSe
       throws Exception {
     Assert.assertNotEquals(_totalRecordsPushedInStream, 0);
 
-    ResultSet pinotResultSet = getPinotConnection()
-        .execute(new Request("sql", "SELECT * FROM " + getTableName() + " 
ORDER BY Origin LIMIT 10000"))
-        .getResultSet(0);
+    ResultSet pinotResultSet = getPinotConnection().execute(
+        new Request("sql", "SELECT * FROM " + getTableName() + " ORDER BY 
Origin LIMIT 10000")).getResultSet(0);
 
     Assert.assertNotEquals(pinotResultSet.getRowCount(), 0);
 
@@ -440,8 +438,8 @@ public class RealtimeKinesisIntegrationTest extends 
BaseClusterIntegrationTestSe
       }
     }
 
-    _h2Connection.prepareCall("CREATE TABLE " + getTableName() + "(" + 
StringUtil
-        .join(",", _h2FieldNameAndTypes.toArray(new 
String[_h2FieldNameAndTypes.size()])) + ")").execute();
+    _h2Connection.prepareCall("CREATE TABLE " + getTableName() + "(" + 
StringUtil.join(",",
+        _h2FieldNameAndTypes.toArray(new String[_h2FieldNameAndTypes.size()])) 
+ ")").execute();
   }
 
   @AfterClass(enabled = false)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to