This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 27ebe7e582 [pinot-core] Start consumption after creating segment data
manager (#11227)
27ebe7e582 is described below
commit 27ebe7e582be7a4b8d2f5a3c34fdfd1bba325df6
Author: Aravind Suresh <[email protected]>
AuthorDate: Tue Aug 1 09:37:26 2023 +0530
[pinot-core] Start consumption after creating segment data manager (#11227)
---
.../data/manager/realtime/HLRealtimeSegmentDataManager.java | 5 +++++
.../data/manager/realtime/LLRealtimeSegmentDataManager.java | 4 ++--
.../data/manager/realtime/RealtimeSegmentDataManager.java | 6 ++++++
.../core/data/manager/realtime/RealtimeTableDataManager.java | 11 ++++++++---
.../manager/realtime/LLRealtimeSegmentDataManagerTest.java | 2 +-
5 files changed, 22 insertions(+), 6 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
index 0e9538f251..246cf83e98 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
@@ -416,6 +416,11 @@ public class HLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
throw new UnsupportedOperationException();
}
+ @Override
+ public void startConsumption() {
+ // no-op
+ }
+
@Override
public ConsumerState getConsumerState() {
throw new UnsupportedOperationException();
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 6b9905ab16..b2c9bdcfaf 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -1285,7 +1285,8 @@ public class LLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
cleanupMetrics();
}
- protected void startConsumerThread() {
+ @Override
+ public void startConsumption() {
_consumerThread = new Thread(new PartitionConsumer(), _segmentNameStr);
_segmentLogger.info("Created new consumer thread {} for {}",
_consumerThread, this);
_consumerThread.start();
@@ -1472,7 +1473,6 @@ public class LLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
_segmentLogger
.info("Starting consumption on realtime consuming segment {}
maxRowCount {} maxEndTime {}", llcSegmentName,
_segmentMaxRowCount, new DateTime(_consumeEndTime,
DateTimeZone.UTC));
- startConsumerThread();
} catch (Exception e) {
// In case of exception thrown here, segment goes to ERROR state. Then
any attempt to reset the segment from
// ERROR -> OFFLINE -> CONSUMING via Helix Admin fails because the
semaphore is acquired, but not released.
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index 3111d28418..d98bc3be06 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -52,6 +52,12 @@ public abstract class RealtimeSegmentDataManager extends
SegmentDataManager {
*/
public abstract Map<String, String> getPartitionToCurrentOffset();
+ /**
+ * Starts the consumption of the underlying realtime segments.
+ * In some cases, it is helpful to not do this inside the constructor itself.
+ */
+ public abstract void startConsumption();
+
/**
* Get the state of the consumer
*/
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index a7ee82b9eb..41c338d209 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -445,14 +445,19 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
PartitionDedupMetadataManager partitionDedupMetadataManager =
_tableDedupMetadataManager != null ?
_tableDedupMetadataManager.getOrCreatePartitionManager(partitionGroupId)
: null;
- segmentDataManager =
+ LLRealtimeSegmentDataManager llRealtimeSegmentDataManager =
new LLRealtimeSegmentDataManager(segmentZKMetadata, tableConfig,
this, _indexDir.getAbsolutePath(),
indexLoadingConfig, schema, llcSegmentName, semaphore,
_serverMetrics, partitionUpsertMetadataManager,
partitionDedupMetadataManager, _isTableReadyToConsumeData);
+ llRealtimeSegmentDataManager.startConsumption();
+ segmentDataManager = llRealtimeSegmentDataManager;
} else {
InstanceZKMetadata instanceZKMetadata =
ZKMetadataProvider.getInstanceZKMetadata(_propertyStore, _instanceId);
- segmentDataManager = new HLRealtimeSegmentDataManager(segmentZKMetadata,
tableConfig, instanceZKMetadata, this,
- _indexDir.getAbsolutePath(), indexLoadingConfig, schema,
_serverMetrics);
+ HLRealtimeSegmentDataManager hlRealtimeSegmentDataManager = new
HLRealtimeSegmentDataManager(segmentZKMetadata,
+ tableConfig, instanceZKMetadata, this,
_indexDir.getAbsolutePath(),
+ indexLoadingConfig, schema, _serverMetrics);
+ hlRealtimeSegmentDataManager.startConsumption();
+ segmentDataManager = hlRealtimeSegmentDataManager;
}
_logger.info("Initialized RealtimeSegmentDataManager - " + segmentName);
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
index 95f19b419e..18b2d707b1 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
@@ -1052,7 +1052,7 @@ public class LLRealtimeSegmentDataManagerTest {
}
@Override
- protected void startConsumerThread() {
+ public void startConsumption() {
// Do nothing.
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]