This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 27ebe7e582 [pinot-core] Start consumption after creating segment data 
manager (#11227)
27ebe7e582 is described below

commit 27ebe7e582be7a4b8d2f5a3c34fdfd1bba325df6
Author: Aravind Suresh <[email protected]>
AuthorDate: Tue Aug 1 09:37:26 2023 +0530

    [pinot-core] Start consumption after creating segment data manager (#11227)
---
 .../data/manager/realtime/HLRealtimeSegmentDataManager.java   |  5 +++++
 .../data/manager/realtime/LLRealtimeSegmentDataManager.java   |  4 ++--
 .../data/manager/realtime/RealtimeSegmentDataManager.java     |  6 ++++++
 .../core/data/manager/realtime/RealtimeTableDataManager.java  | 11 ++++++++---
 .../manager/realtime/LLRealtimeSegmentDataManagerTest.java    |  2 +-
 5 files changed, 22 insertions(+), 6 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
index 0e9538f251..246cf83e98 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
@@ -416,6 +416,11 @@ public class HLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
     throw new UnsupportedOperationException();
   }
 
+  @Override
+  public void startConsumption() {
+    // no-op
+  }
+
   @Override
   public ConsumerState getConsumerState() {
     throw new UnsupportedOperationException();
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 6b9905ab16..b2c9bdcfaf 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -1285,7 +1285,8 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
     cleanupMetrics();
   }
 
-  protected void startConsumerThread() {
+  @Override
+  public void startConsumption() {
     _consumerThread = new Thread(new PartitionConsumer(), _segmentNameStr);
     _segmentLogger.info("Created new consumer thread {} for {}", 
_consumerThread, this);
     _consumerThread.start();
@@ -1472,7 +1473,6 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
       _segmentLogger
           .info("Starting consumption on realtime consuming segment {} 
maxRowCount {} maxEndTime {}", llcSegmentName,
               _segmentMaxRowCount, new DateTime(_consumeEndTime, 
DateTimeZone.UTC));
-      startConsumerThread();
     } catch (Exception e) {
       // In case of exception thrown here, segment goes to ERROR state. Then 
any attempt to reset the segment from
       // ERROR -> OFFLINE -> CONSUMING via Helix Admin fails because the 
semaphore is acquired, but not released.
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index 3111d28418..d98bc3be06 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -52,6 +52,12 @@ public abstract class RealtimeSegmentDataManager extends 
SegmentDataManager {
    */
   public abstract Map<String, String> getPartitionToCurrentOffset();
 
+  /**
+   * Starts the consumption of the underlying realtime segments.
+   * In some cases, it is helpful to not do this inside the constructor itself.
+   */
+  public abstract void startConsumption();
+
   /**
    * Get the state of the consumer
    */
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index a7ee82b9eb..41c338d209 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -445,14 +445,19 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
       PartitionDedupMetadataManager partitionDedupMetadataManager =
           _tableDedupMetadataManager != null ? 
_tableDedupMetadataManager.getOrCreatePartitionManager(partitionGroupId)
               : null;
-      segmentDataManager =
+      LLRealtimeSegmentDataManager llRealtimeSegmentDataManager =
           new LLRealtimeSegmentDataManager(segmentZKMetadata, tableConfig, 
this, _indexDir.getAbsolutePath(),
               indexLoadingConfig, schema, llcSegmentName, semaphore, 
_serverMetrics, partitionUpsertMetadataManager,
               partitionDedupMetadataManager, _isTableReadyToConsumeData);
+      llRealtimeSegmentDataManager.startConsumption();
+      segmentDataManager = llRealtimeSegmentDataManager;
     } else {
       InstanceZKMetadata instanceZKMetadata = 
ZKMetadataProvider.getInstanceZKMetadata(_propertyStore, _instanceId);
-      segmentDataManager = new HLRealtimeSegmentDataManager(segmentZKMetadata, 
tableConfig, instanceZKMetadata, this,
-          _indexDir.getAbsolutePath(), indexLoadingConfig, schema, 
_serverMetrics);
+      HLRealtimeSegmentDataManager hlRealtimeSegmentDataManager = new 
HLRealtimeSegmentDataManager(segmentZKMetadata,
+              tableConfig, instanceZKMetadata, this, 
_indexDir.getAbsolutePath(),
+              indexLoadingConfig, schema, _serverMetrics);
+      hlRealtimeSegmentDataManager.startConsumption();
+      segmentDataManager = hlRealtimeSegmentDataManager;
     }
 
     _logger.info("Initialized RealtimeSegmentDataManager - " + segmentName);
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
index 95f19b419e..18b2d707b1 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
@@ -1052,7 +1052,7 @@ public class LLRealtimeSegmentDataManagerTest {
     }
 
     @Override
-    protected void startConsumerThread() {
+    public void startConsumption() {
       // Do nothing.
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to