This is an automated email from the ASF dual-hosted git repository.
kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new ced6bc282e Use higher fetch timeout for Kinesis (#12214)
ced6bc282e is described below
commit ced6bc282ea9049f45f59f99738e5f1132a03a18
Author: Kartik Khare <[email protected]>
AuthorDate: Mon Jan 22 22:16:02 2024 +0530
Use higher fetch timeout for Kinesis (#12214)
* Use higher fetch timeout for Kinesis
* Add todo
* Add test
---------
Co-authored-by: Kartik Khare <[email protected]>
Co-authored-by: Kartik Khare
<[email protected]>
---
.../core/realtime/stream/StreamConfigTest.java | 27 ++++++++++++++++++++++
.../org/apache/pinot/spi/stream/StreamConfig.java | 7 +++++-
2 files changed, 33 insertions(+), 1 deletion(-)
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/stream/StreamConfigTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/stream/StreamConfigTest.java
index 333eecab04..11c7ee2010 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/stream/StreamConfigTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/stream/StreamConfigTest.java
@@ -403,4 +403,31 @@ public class StreamConfigTest {
// expected
}
}
+
+ @Test
+ public void testKinesisFetchTimeout() {
+ String streamType = "fakeStream";
+ String topic = "fakeTopic";
+ String tableName = "fakeTable_REALTIME";
+ String consumerFactoryClass = "KinesisConsumerFactory";
+ String decoderClass = FakeStreamMessageDecoder.class.getName();
+
+ Map<String, String> streamConfigMap = new HashMap<>();
+ streamConfigMap.put(StreamConfigProperties.STREAM_TYPE, streamType);
+ streamConfigMap.put(
+ StreamConfigProperties.constructStreamProperty(streamType,
StreamConfigProperties.STREAM_TOPIC_NAME), topic);
+
streamConfigMap.put(StreamConfigProperties.constructStreamProperty(streamType,
+ StreamConfigProperties.STREAM_CONSUMER_FACTORY_CLASS),
consumerFactoryClass);
+ streamConfigMap.put(
+ StreamConfigProperties.constructStreamProperty(streamType,
StreamConfigProperties.STREAM_DECODER_CLASS),
+ decoderClass);
+
+ String consumerType = "simple";
+ streamConfigMap.put(
+ StreamConfigProperties.constructStreamProperty(streamType,
StreamConfigProperties.STREAM_CONSUMER_TYPES),
+ consumerType);
+ StreamConfig streamConfig = new StreamConfig(tableName, streamConfigMap);
+
+ assertEquals(streamConfig.getFetchTimeoutMillis(),
StreamConfig.DEFAULT_STREAM_FETCH_TIMEOUT_MILLIS_KINESIS);
+ }
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java
index 94f8adf566..ea24f5d01b 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConfig.java
@@ -48,6 +48,7 @@ public class StreamConfig {
public static final long DEFAULT_STREAM_CONNECTION_TIMEOUT_MILLIS = 30_000;
public static final int DEFAULT_STREAM_FETCH_TIMEOUT_MILLIS = 5_000;
+ public static final int DEFAULT_STREAM_FETCH_TIMEOUT_MILLIS_KINESIS =
600_000;
public static final int DEFAULT_IDLE_TIMEOUT_MILLIS = 3 * 60 * 1000;
private static final double CONSUMPTION_RATE_LIMIT_NOT_SPECIFIED = -1;
@@ -142,7 +143,11 @@ public class StreamConfig {
}
_connectionTimeoutMillis = connectionTimeoutMillis;
- int fetchTimeoutMillis = DEFAULT_STREAM_FETCH_TIMEOUT_MILLIS;
+ // For Kinesis, we need to set a higher fetch timeout to avoid getting
stuck in empty records loop
+ // TODO: Remove this once we have a better way to handle empty records in
Kinesis
+ int fetchTimeoutMillis =
+ _consumerFactoryClassName.contains("KinesisConsumerFactory") ?
DEFAULT_STREAM_FETCH_TIMEOUT_MILLIS_KINESIS
+ : DEFAULT_STREAM_FETCH_TIMEOUT_MILLIS;
String fetchTimeoutKey =
StreamConfigProperties.constructStreamProperty(_type,
StreamConfigProperties.STREAM_FETCH_TIMEOUT_MILLIS);
String fetchTimeoutValue = streamConfigMap.get(fetchTimeoutKey);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]