This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 294ea08a32 Misc fixes on segment validation for uploaded real-time 
segments (#8786)
294ea08a32 is described below

commit 294ea08a32c2174c56739517c923f054aafb8435
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Mon May 30 12:29:29 2022 -0700

    Misc fixes on segment validation for uploaded real-time segments (#8786)
---
 .../common/tier/FixedTierSegmentSelector.java      |   3 +-
 .../apache/pinot/common/utils/LLCSegmentName.java  |  57 +++++----
 .../org/apache/pinot/common/utils/LLCUtils.java    |  54 --------
 .../apache/pinot/common/utils/SegmentUtils.java    |  15 ++-
 .../pinot/controller/api/upload/ZKOperator.java    |  13 +-
 .../realtime/PinotLLCRealtimeSegmentManager.java   | 121 +++++++++---------
 .../core/realtime/SegmentCompletionManager.java    |   7 +-
 .../helix/core/util/ZKMetadataUtils.java           |  37 +++---
 .../RealtimeSegmentValidationManager.java          |  96 +++++++--------
 .../controller/api/upload/ZKOperatorTest.java      | 136 ++++++++++++++++++---
 .../controller/helix/PinotResourceManagerTest.java |   5 +-
 .../helix/core/realtime/SegmentCompletionTest.java |   4 -
 .../validation/ValidationManagerTest.java          |  33 +++--
 .../manager/realtime/RealtimeTableDataManager.java |  51 ++++----
 .../RealtimeToOfflineSegmentsTaskGenerator.java    |  59 ++++-----
 .../RealtimeIndexOffHeapMemoryManager.java         |   4 +-
 .../apache/pinot/spi/utils/CommonConstants.java    |  15 ++-
 17 files changed, 373 insertions(+), 337 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/tier/FixedTierSegmentSelector.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/tier/FixedTierSegmentSelector.java
index c9db7478d0..19ac10c91e 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/tier/FixedTierSegmentSelector.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/tier/FixedTierSegmentSelector.java
@@ -25,7 +25,6 @@ import org.apache.helix.HelixManager;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.utils.CommonConstants.Segment.Realtime;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 
 
@@ -64,7 +63,7 @@ public class FixedTierSegmentSelector implements 
TierSegmentSelector {
               segmentName);
       Preconditions.checkNotNull(segmentZKMetadata, "Could not find zk 
metadata for segment: {} of table: {}",
           segmentName, tableNameWithType);
-      return 
!segmentZKMetadata.getStatus().equals(Realtime.Status.IN_PROGRESS);
+      return segmentZKMetadata.getStatus().isCompleted();
     }
     return false;
   }
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/LLCSegmentName.java 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/LLCSegmentName.java
index 4bae626f49..2a533a0e68 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/LLCSegmentName.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/LLCSegmentName.java
@@ -18,6 +18,9 @@
  */
 package org.apache.pinot.common.utils;
 
+import com.google.common.base.Preconditions;
+import java.util.Objects;
+import javax.annotation.Nullable;
 import org.apache.commons.lang3.StringUtils;
 import org.joda.time.DateTime;
 import org.joda.time.format.DateTimeFormat;
@@ -35,16 +38,13 @@ public class LLCSegmentName extends SegmentName implements 
Comparable {
   private final String _segmentName;
 
   public LLCSegmentName(String segmentName) {
-    if (!isLowLevelConsumerSegmentName(segmentName)) {
-      throw new RuntimeException(segmentName + " is not a Low level consumer 
segment name");
-    }
-
-    _segmentName = segmentName;
     String[] parts = StringUtils.splitByWholeSeparator(segmentName, SEPARATOR);
+    Preconditions.checkArgument(parts.length == 4, "Invalid LLC segment name: 
%s", segmentName);
     _tableName = parts[0];
     _partitionGroupId = Integer.parseInt(parts[1]);
     _sequenceNumber = Integer.parseInt(parts[2]);
     _creationTime = parts[3];
+    _segmentName = segmentName;
   }
 
   public LLCSegmentName(String tableName, int partitionGroupId, int 
sequenceNumber, long msSinceEpoch) {
@@ -59,6 +59,28 @@ public class LLCSegmentName extends SegmentName implements 
Comparable {
     _segmentName = tableName + SEPARATOR + partitionGroupId + SEPARATOR + 
sequenceNumber + SEPARATOR + _creationTime;
   }
 
+  private LLCSegmentName(String tableName, int partitionGroupId, int 
sequenceNumber, String creationTime,
+      String segmentName) {
+    _tableName = tableName;
+    _partitionGroupId = partitionGroupId;
+    _sequenceNumber = sequenceNumber;
+    _creationTime = creationTime;
+    _segmentName = segmentName;
+  }
+
+  /**
+   * Returns the {@link LLCSegmentName} for the given segment name, or {@code 
null} if the given segment name does not
+   * represent an LLC segment.
+   */
+  @Nullable
+  public static LLCSegmentName of(String segmentName) {
+    String[] parts = StringUtils.splitByWholeSeparator(segmentName, SEPARATOR);
+    if (parts.length != 4) {
+      return null;
+    }
+    return new LLCSegmentName(parts[0], Integer.parseInt(parts[1]), 
Integer.parseInt(parts[2]), parts[3], segmentName);
+  }
+
   /**
    * Returns the sequence number of the given segment name.
    */
@@ -145,32 +167,13 @@ public class LLCSegmentName extends SegmentName 
implements Comparable {
     if (o == null || getClass() != o.getClass()) {
       return false;
     }
-
-    LLCSegmentName segName = (LLCSegmentName) o;
-
-    if (_partitionGroupId != segName._partitionGroupId) {
-      return false;
-    }
-    if (_sequenceNumber != segName._sequenceNumber) {
-      return false;
-    }
-    if (_tableName != null ? !_tableName.equals(segName._tableName) : 
segName._tableName != null) {
-      return false;
-    }
-    if (_creationTime != null ? !_creationTime.equals(segName._creationTime) : 
segName._creationTime != null) {
-      return false;
-    }
-    return !(_segmentName != null ? !_segmentName.equals(segName._segmentName) 
: segName._segmentName != null);
+    LLCSegmentName that = (LLCSegmentName) o;
+    return _segmentName.equals(that._segmentName);
   }
 
   @Override
   public int hashCode() {
-    int result = _tableName != null ? _tableName.hashCode() : 0;
-    result = 31 * result + _partitionGroupId;
-    result = 31 * result + _sequenceNumber;
-    result = 31 * result + (_creationTime != null ? _creationTime.hashCode() : 
0);
-    result = 31 * result + (_segmentName != null ? _segmentName.hashCode() : 
0);
-    return result;
+    return Objects.hash(_segmentName);
   }
 
   @Override
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/LLCUtils.java 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/LLCUtils.java
deleted file mode 100644
index ee71e29f1e..0000000000
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/LLCUtils.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.common.utils;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeSet;
-
-
-public class LLCUtils {
-  private LLCUtils() {
-  }
-
-  /**
-   * Compute the table of a sorted list of segments grouped by Kafka partition.
-   *
-   * @param segmentSet is the set of segment names that need to be sorted.
-   * @return map of Stream partition to sorted set of segment names
-   */
-  public static Map<String, SortedSet<SegmentName>> 
sortSegmentsByStreamPartition(Set<String> segmentSet) {
-    Map<String, SortedSet<SegmentName>> sortedSegmentsByStreamPartition = new 
HashMap<>();
-    for (String segment : segmentSet) {
-      // Ignore segments that are not low level consumer segments
-      if (!SegmentName.isLowLevelConsumerSegmentName(segment)) {
-        continue;
-      }
-
-      final LLCSegmentName segmentName = new LLCSegmentName(segment);
-      String streamPartitionId = segmentName.getPartitionRange();
-      SortedSet<SegmentName> segmentsForPartition =
-          sortedSegmentsByStreamPartition.computeIfAbsent(streamPartitionId, k 
-> new TreeSet<>());
-      segmentsForPartition.add(segmentName);
-    }
-    return sortedSegmentsByStreamPartition;
-  }
-}
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentUtils.java 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentUtils.java
index 04be4a1c58..ad0f49cc2b 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentUtils.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentUtils.java
@@ -37,18 +37,17 @@ public class SegmentUtils {
   // path.
   @Nullable
   public static Integer getRealtimeSegmentPartitionId(String segmentName, 
String realtimeTableName,
-      HelixManager helixManager,
-      String partitionColumn) {
-    // A fast path if the segmentName is a LLC segment name and we can get the 
partition id from the name directly.
-    if (LLCSegmentName.isLowLevelConsumerSegmentName(segmentName)) {
-      return new LLCSegmentName(segmentName).getPartitionGroupId();
+      HelixManager helixManager, String partitionColumn) {
+    // A fast path if the segmentName is an LLC segment name: get the 
partition id from the name directly
+    LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentName);
+    if (llcSegmentName != null) {
+      return llcSegmentName.getPartitionGroupId();
     }
     // Otherwise, retrieve the partition id from the segment zk metadata.
     SegmentZKMetadata segmentZKMetadata =
         
ZKMetadataProvider.getSegmentZKMetadata(helixManager.getHelixPropertyStore(), 
realtimeTableName, segmentName);
-    Preconditions
-        .checkState(segmentZKMetadata != null, "Failed to find segment ZK 
metadata for segment: %s of table: %s",
-            segmentName, realtimeTableName);
+    Preconditions.checkState(segmentZKMetadata != null,
+        "Failed to find segment ZK metadata for segment: %s of table: %s", 
segmentName, realtimeTableName);
     SegmentPartitionMetadata segmentPartitionMetadata = 
segmentZKMetadata.getPartitionMetadata();
     if (segmentPartitionMetadata != null) {
       ColumnPartitionMetadata columnPartitionMetadata =
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
index 7a12d9e5eb..b75d112ce2 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
@@ -242,9 +242,16 @@ public class ZKOperator {
       long segmentSizeInBytes, boolean enableParallelPushProtection, 
HttpHeaders headers)
       throws Exception {
     String segmentName = segmentMetadata.getName();
-    SegmentZKMetadata newSegmentZKMetadata =
-        ZKMetadataUtils.createSegmentZKMetadata(tableNameWithType, 
segmentMetadata, downloadUrl, crypterName,
-            segmentSizeInBytes);
+    SegmentZKMetadata newSegmentZKMetadata;
+    try {
+      newSegmentZKMetadata =
+          ZKMetadataUtils.createSegmentZKMetadata(tableNameWithType, 
segmentMetadata, downloadUrl, crypterName,
+              segmentSizeInBytes);
+    } catch (IllegalArgumentException e) {
+      throw new ControllerApplicationException(LOGGER,
+          String.format("Got invalid segment metadata when adding segment: %s 
for table: %s, reason: %s", segmentName,
+              tableNameWithType, e.getMessage()), Response.Status.BAD_REQUEST);
+    }
 
     // Lock if enableParallelPushProtection is true.
     if (enableParallelPushProtection) {
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 1a7428687b..75a3b84f93 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -156,10 +156,10 @@ public class PinotLLCRealtimeSegmentManager {
   private final Lock[] _idealStateUpdateLocks;
   private final FlushThresholdUpdateManager _flushThresholdUpdateManager;
   private final boolean _isDeepStoreLLCSegmentUploadRetryEnabled;
+  private final FileUploadDownloadClient _fileUploadDownloadClient;
+  private final AtomicInteger _numCompletingSegments = new AtomicInteger(0);
 
   private volatile boolean _isStopping = false;
-  private AtomicInteger _numCompletingSegments = new AtomicInteger(0);
-  private FileUploadDownloadClient _fileUploadDownloadClient;
 
   public PinotLLCRealtimeSegmentManager(PinotHelixResourceManager 
helixResourceManager, ControllerConf controllerConf,
       ControllerMetrics controllerMetrics) {
@@ -179,9 +179,7 @@ public class PinotLLCRealtimeSegmentManager {
     }
     _flushThresholdUpdateManager = new FlushThresholdUpdateManager();
     _isDeepStoreLLCSegmentUploadRetryEnabled = 
controllerConf.isDeepStoreRetryUploadLLCSegmentEnabled();
-    if (_isDeepStoreLLCSegmentUploadRetryEnabled) {
-      _fileUploadDownloadClient = initFileUploadDownloadClient();
-    }
+    _fileUploadDownloadClient = _isDeepStoreLLCSegmentUploadRetryEnabled ? 
initFileUploadDownloadClient() : null;
   }
 
   public boolean isDeepStoreLLCSegmentUploadRetryEnabled() {
@@ -210,10 +208,10 @@ public class PinotLLCRealtimeSegmentManager {
     for (String segment : idealState.getRecord().getMapFields().keySet()) {
       // With Pinot upsert table allowing uploads of segments, the segment 
name of an upsert table segment may not
       // conform to LLCSegment format. We can skip such segments because they 
are NOT the consuming segments.
-      if (!LLCSegmentName.isLowLevelConsumerSegmentName(segment)) {
+      LLCSegmentName llcSegmentName = LLCSegmentName.of(segment);
+      if (llcSegmentName == null) {
         continue;
       }
-      LLCSegmentName llcSegmentName = new LLCSegmentName(segment);
       int partitionGroupId = llcSegmentName.getPartitionGroupId();
       partitionGroupIdToLatestSegment.compute(partitionGroupId, (k, 
latestSegment) -> {
         if (latestSegment == null) {
@@ -249,8 +247,8 @@ public class PinotLLCRealtimeSegmentManager {
 
   public void stop() {
     _isStopping = true;
-    LOGGER
-        .info("Awaiting segment metadata commits: maxWaitTimeMillis = {}", 
MAX_LLC_SEGMENT_METADATA_COMMIT_TIME_MILLIS);
+    LOGGER.info("Awaiting segment metadata commits: maxWaitTimeMillis = {}",
+        MAX_LLC_SEGMENT_METADATA_COMMIT_TIME_MILLIS);
     long millisToWait = MAX_LLC_SEGMENT_METADATA_COMMIT_TIME_MILLIS;
 
     // Busy-wait for all segments that are committing metadata to complete 
their operation.
@@ -363,11 +361,11 @@ public class PinotLLCRealtimeSegmentManager {
   @VisibleForTesting
   InstancePartitions getConsumingInstancePartitions(TableConfig tableConfig) {
     try {
-      return InstancePartitionsUtils
-          .fetchOrComputeInstancePartitions(_helixManager, tableConfig, 
InstancePartitionsType.CONSUMING);
+      return 
InstancePartitionsUtils.fetchOrComputeInstancePartitions(_helixManager, 
tableConfig,
+          InstancePartitionsType.CONSUMING);
     } catch (Exception e) {
-      _controllerMetrics
-          .addMeteredTableValue(tableConfig.getTableName(), 
ControllerMeter.LLC_ZOOKEEPER_FETCH_FAILURES, 1L);
+      _controllerMetrics.addMeteredTableValue(tableConfig.getTableName(), 
ControllerMeter.LLC_ZOOKEEPER_FETCH_FAILURES,
+          1L);
       throw e;
     }
   }
@@ -399,12 +397,11 @@ public class PinotLLCRealtimeSegmentManager {
   @VisibleForTesting
   SegmentZKMetadata getSegmentZKMetadata(String realtimeTableName, String 
segmentName, @Nullable Stat stat) {
     try {
-      ZNRecord znRecord = _propertyStore
-          
.get(ZKMetadataProvider.constructPropertyStorePathForSegment(realtimeTableName, 
segmentName), stat,
-              AccessOption.PERSISTENT);
-      Preconditions
-          .checkState(znRecord != null, "Failed to find segment ZK metadata 
for segment: %s of table: %s", segmentName,
-              realtimeTableName);
+      ZNRecord znRecord =
+          
_propertyStore.get(ZKMetadataProvider.constructPropertyStorePathForSegment(realtimeTableName,
 segmentName),
+              stat, AccessOption.PERSISTENT);
+      Preconditions.checkState(znRecord != null, "Failed to find segment ZK 
metadata for segment: %s of table: %s",
+          segmentName, realtimeTableName);
       return new SegmentZKMetadata(znRecord);
     } catch (Exception e) {
       _controllerMetrics.addMeteredTableValue(realtimeTableName, 
ControllerMeter.LLC_ZOOKEEPER_FETCH_FAILURES, 1L);
@@ -417,9 +414,9 @@ public class PinotLLCRealtimeSegmentManager {
     String segmentName = segmentZKMetadata.getSegmentName();
     LOGGER.info("Persisting segment ZK metadata for segment: {}", segmentName);
     try {
-      Preconditions.checkState(_propertyStore
-              
.set(ZKMetadataProvider.constructPropertyStorePathForSegment(realtimeTableName, 
segmentName),
-                  segmentZKMetadata.toZNRecord(), expectedVersion, 
AccessOption.PERSISTENT),
+      Preconditions.checkState(
+          
_propertyStore.set(ZKMetadataProvider.constructPropertyStorePathForSegment(realtimeTableName,
 segmentName),
+              segmentZKMetadata.toZNRecord(), expectedVersion, 
AccessOption.PERSISTENT),
           "Failed to persist segment ZK metadata for segment: %s of table: 
%s", segmentName, realtimeTableName);
     } catch (Exception e) {
       _controllerMetrics.addMeteredTableValue(realtimeTableName, 
ControllerMeter.LLC_ZOOKEEPER_UPDATE_FAILURES, 1L);
@@ -522,9 +519,9 @@ public class PinotLLCRealtimeSegmentManager {
     TableConfig tableConfig = getTableConfig(realtimeTableName);
     InstancePartitions instancePartitions = 
getConsumingInstancePartitions(tableConfig);
     IdealState idealState = getIdealState(realtimeTableName);
-    Preconditions
-        
.checkState(idealState.getInstanceStateMap(committingSegmentName).containsValue(SegmentStateModel.CONSUMING),
-            "Failed to find instance in CONSUMING state in IdealState for 
segment: %s", committingSegmentName);
+    Preconditions.checkState(
+        
idealState.getInstanceStateMap(committingSegmentName).containsValue(SegmentStateModel.CONSUMING),
+        "Failed to find instance in CONSUMING state in IdealState for segment: 
%s", committingSegmentName);
     int numReplicas = getNumReplicas(tableConfig, instancePartitions);
 
     /*
@@ -668,9 +665,9 @@ public class PinotLLCRealtimeSegmentManager {
     String segmentName = newLLCSegmentName.getSegmentName();
     String startOffset = committingSegmentDescriptor.getNextOffset();
 
-    LOGGER
-        .info("Creating segment ZK metadata for new CONSUMING segment: {} with 
start offset: {} and creation time: {}",
-            segmentName, startOffset, creationTimeMs);
+    LOGGER.info(
+        "Creating segment ZK metadata for new CONSUMING segment: {} with start 
offset: {} and creation time: {}",
+        segmentName, startOffset, creationTimeMs);
 
     SegmentZKMetadata newSegmentZKMetadata = new 
SegmentZKMetadata(segmentName);
     newSegmentZKMetadata.setCreationTime(creationTimeMs);
@@ -761,8 +758,8 @@ public class PinotLLCRealtimeSegmentManager {
   @VisibleForTesting
   List<PartitionGroupMetadata> getNewPartitionGroupMetadataList(StreamConfig 
streamConfig,
       List<PartitionGroupConsumptionStatus> 
currentPartitionGroupConsumptionStatusList) {
-    return PinotTableIdealStateBuilder
-        .getPartitionGroupMetadataList(streamConfig, 
currentPartitionGroupConsumptionStatusList);
+    return 
PinotTableIdealStateBuilder.getPartitionGroupMetadataList(streamConfig,
+        currentPartitionGroupConsumptionStatusList);
   }
 
   /**
@@ -869,8 +866,8 @@ public class PinotLLCRealtimeSegmentManager {
    * TODO: We need to find a place to detect and update a gauge for 
nonConsumingPartitionsCount for a table, and
    * reset it to 0 at the end of validateLLC
    */
-  public void ensureAllPartitionsConsuming(TableConfig tableConfig,
-      PartitionLevelStreamConfig streamConfig, boolean 
recreateDeletedConsumingSegment) {
+  public void ensureAllPartitionsConsuming(TableConfig tableConfig, 
PartitionLevelStreamConfig streamConfig,
+      boolean recreateDeletedConsumingSegment) {
     Preconditions.checkState(!_isStopping, "Segment manager is stopping");
 
     String realtimeTableName = tableConfig.getTableName();
@@ -931,8 +928,8 @@ public class PinotLLCRealtimeSegmentManager {
     if (committingSegmentName != null) {
       // Change committing segment state to ONLINE
       Set<String> instances = 
instanceStatesMap.get(committingSegmentName).keySet();
-      instanceStatesMap
-          .put(committingSegmentName, 
SegmentAssignmentUtils.getInstanceStateMap(instances, 
SegmentStateModel.ONLINE));
+      instanceStatesMap.put(committingSegmentName,
+          SegmentAssignmentUtils.getInstanceStateMap(instances, 
SegmentStateModel.ONLINE));
       LOGGER.info("Updating segment: {} to ONLINE state", 
committingSegmentName);
     }
 
@@ -948,13 +945,13 @@ public class PinotLLCRealtimeSegmentManager {
       int partitionId = newLLCSegmentName.getPartitionGroupId();
       int seqNum = newLLCSegmentName.getSequenceNumber();
       for (String segmentNameStr : instanceStatesMap.keySet()) {
-        if (!LLCSegmentName.isLowLevelConsumerSegmentName(segmentNameStr)) {
+        LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentNameStr);
+        if (llcSegmentName == null) {
           // skip the segment name if the name is not in low-level consumer 
format
           // such segment name can appear for uploaded segment
           LOGGER.debug("Skip segment name {} not in low-level consumer 
format", segmentNameStr);
           continue;
         }
-        LLCSegmentName llcSegmentName = new LLCSegmentName(segmentNameStr);
         if (llcSegmentName.getPartitionGroupId() == partitionId && 
llcSegmentName.getSequenceNumber() == seqNum) {
           String errorMsg =
               String.format("Segment %s is a duplicate of existing segment 
%s", newSegmentName, segmentNameStr);
@@ -1140,18 +1137,18 @@ public class PinotLLCRealtimeSegmentManager {
                 segmentAssignment, instancePartitionsMap, startOffset);
           } else {
             if (newPartitionGroupSet.contains(partitionGroupId)) {
-              if (recreateDeletedConsumingSegment && 
Status.DONE.equals(latestSegmentZKMetadata.getStatus())
+              if (recreateDeletedConsumingSegment && 
latestSegmentZKMetadata.getStatus().isCompleted()
                   && isAllInstancesInState(instanceStateMap, 
SegmentStateModel.ONLINE)) {
                 // If we get here, that means in IdealState, the latest 
segment has all replicas ONLINE.
                 // Create a new IN_PROGRESS segment in PROPERTYSTORE,
                 // add it as CONSUMING segment to IDEALSTATE.
                 StreamPartitionMsgOffset startOffset = 
offsetFactory.create(latestSegmentZKMetadata.getEndOffset());
                 createNewConsumingSegment(tableConfig, streamConfig, 
latestSegmentZKMetadata, currentTimeMs,
-                    partitionGroupId, newPartitionGroupMetadataList, 
instancePartitions,
-                    instanceStatesMap, segmentAssignment, 
instancePartitionsMap, startOffset);
+                    partitionGroupId, newPartitionGroupMetadataList, 
instancePartitions, instanceStatesMap,
+                    segmentAssignment, instancePartitionsMap, startOffset);
               } else {
-                LOGGER.error("Got unexpected instance state map: {} for 
segment: {}",
-                    instanceStateMap, latestSegmentName);
+                LOGGER.error("Got unexpected instance state map: {} for 
segment: {}", instanceStateMap,
+                    latestSegmentName);
               }
             }
             // else, the partition group has reached end of life. This is an 
acceptable state
@@ -1173,17 +1170,16 @@ public class PinotLLCRealtimeSegmentManager {
           // Find the previous CONSUMING segment
           String previousConsumingSegment = null;
           for (Map.Entry<String, Map<String, String>> segmentEntry : 
instanceStatesMap.entrySet()) {
-            LLCSegmentName llcSegmentName = new 
LLCSegmentName(segmentEntry.getKey());
-            if (llcSegmentName.getPartitionGroupId() == partitionGroupId && 
segmentEntry.getValue()
-                .containsValue(SegmentStateModel.CONSUMING)) {
-              previousConsumingSegment = llcSegmentName.getSegmentName();
+            if 
(segmentEntry.getValue().containsValue(SegmentStateModel.CONSUMING)
+                && new 
LLCSegmentName(segmentEntry.getKey()).getPartitionGroupId() == 
partitionGroupId) {
+              previousConsumingSegment = segmentEntry.getKey();
               break;
             }
           }
           if (previousConsumingSegment == null) {
-            LOGGER
-                .error("Failed to find previous CONSUMING segment for 
partition: {} of table: {}, potential data loss",
-                    partitionGroupId, realtimeTableName);
+            LOGGER.error(
+                "Failed to find previous CONSUMING segment for partition: {} 
of table: {}, potential data loss",
+                partitionGroupId, realtimeTableName);
             _controllerMetrics.addMeteredTableValue(realtimeTableName, 
ControllerMeter.LLC_STREAM_DATA_LOSS, 1L);
           }
           updateInstanceStatesForNewConsumingSegment(instanceStatesMap, 
previousConsumingSegment, latestSegmentName,
@@ -1232,9 +1228,8 @@ public class PinotLLCRealtimeSegmentManager {
 
     CommittingSegmentDescriptor committingSegmentDescriptor =
         new 
CommittingSegmentDescriptor(latestSegmentZKMetadata.getSegmentName(), 
startOffset.toString(), 0);
-    createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, 
currentTimeMs,
-        committingSegmentDescriptor, latestSegmentZKMetadata, 
instancePartitions, numPartitions,
-        numReplicas, newPartitionGroupMetadataList);
+    createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, 
currentTimeMs, committingSegmentDescriptor,
+        latestSegmentZKMetadata, instancePartitions, numPartitions, 
numReplicas, newPartitionGroupMetadataList);
     String newSegmentName = newLLCSegmentName.getSegmentName();
     updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, 
newSegmentName, segmentAssignment,
         instancePartitionsMap);
@@ -1338,13 +1333,9 @@ public class PinotLLCRealtimeSegmentManager {
    * TODO: Add an on-demand way to upload LLC segment to deep store for a 
specific table.
    */
   public void uploadToDeepStoreIfMissing(TableConfig tableConfig, 
List<SegmentZKMetadata> segmentsZKMetadata) {
-    String realtimeTableName = tableConfig.getTableName();
+    Preconditions.checkState(!_isStopping, "Segment manager is stopping");
 
-    if (_isStopping) {
-      LOGGER.info("Skipped fixing deep store copy of LLC segments for table 
{}, because segment manager is stopping.",
-          realtimeTableName);
-      return;
-    }
+    String realtimeTableName = tableConfig.getTableName();
 
     // Use this retention value to avoid the data racing between segment 
upload and retention management.
     RetentionStrategy retentionStrategy = null;
@@ -1367,11 +1358,8 @@ public class PinotLLCRealtimeSegmentManager {
       //  servers. We may need to rate control the upload request if it is 
changed to be in parallel.
       String segmentName = segmentZKMetadata.getSegmentName();
       try {
-        if (!LLCSegmentName.isLowLevelConsumerSegmentName(segmentName)) {
-          continue;
-        }
-        // Only fix the committed / externally uploaded llc segment without 
deep store copy
-        if (segmentZKMetadata.getStatus() == Status.IN_PROGRESS
+        // Only fix the committed (DONE) LLC segment without deep store copy 
(empty download URL)
+        if (!SegmentName.isLowLevelConsumerSegmentName(segmentName) || 
segmentZKMetadata.getStatus() != Status.DONE
             || 
!CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(segmentZKMetadata.getDownloadUrl()))
 {
           continue;
         }
@@ -1386,8 +1374,9 @@ public class PinotLLCRealtimeSegmentManager {
         List<URI> peerSegmentURIs =
             PeerServerSegmentFinder.getPeerServerURIs(segmentName, 
CommonConstants.HTTP_PROTOCOL, _helixManager);
         if (peerSegmentURIs.isEmpty()) {
-          throw new IllegalStateException(String
-              .format("Failed to upload segment %s to deep store because no 
online replica is found", segmentName));
+          throw new IllegalStateException(
+              String.format("Failed to upload segment %s to deep store because 
no online replica is found",
+                  segmentName));
         }
 
         // Randomly ask one server to upload
@@ -1404,8 +1393,8 @@ public class PinotLLCRealtimeSegmentManager {
         LOGGER.info("Successfully uploaded LLC segment {} to deep store with 
download url: {}", segmentName,
             segmentDownloadUrl);
       } catch (Exception e) {
-        _controllerMetrics
-            .addMeteredTableValue(realtimeTableName, 
ControllerMeter.LLC_SEGMENTS_DEEP_STORE_UPLOAD_RETRY_ERROR, 1L);
+        _controllerMetrics.addMeteredTableValue(realtimeTableName,
+            ControllerMeter.LLC_SEGMENTS_DEEP_STORE_UPLOAD_RETRY_ERROR, 1L);
         LOGGER.error("Failed to upload segment {} to deep store", segmentName, 
e);
       }
     }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
index 597eb1255b..7f5f7185a9 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
@@ -118,11 +118,6 @@ public class SegmentCompletionManager {
     return System.currentTimeMillis();
   }
 
-  public StreamPartitionMsgOffsetFactory 
getStreamPartitionMsgOffsetFactory(String segmentName) {
-    final LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
-    return getStreamPartitionMsgOffsetFactory(llcSegmentName);
-  }
-
   protected StreamPartitionMsgOffsetFactory 
getStreamPartitionMsgOffsetFactory(LLCSegmentName llcSegmentName) {
     final String rawTableName = llcSegmentName.getTableName();
     TableConfig tableConfig = 
_segmentManager.getTableConfig(TableNameBuilder.REALTIME.tableNameWithType(rawTableName));
@@ -149,7 +144,7 @@ public class SegmentCompletionManager {
         final String realtimeTableName = 
TableNameBuilder.REALTIME.tableNameWithType(segmentName.getTableName());
         SegmentZKMetadata segmentMetadata =
             _segmentManager.getSegmentZKMetadata(realtimeTableName, 
segmentName.getSegmentName(), null);
-        if 
(segmentMetadata.getStatus().equals(CommonConstants.Segment.Realtime.Status.DONE))
 {
+        if (segmentMetadata.getStatus() == 
CommonConstants.Segment.Realtime.Status.DONE) {
           // Best to go through the state machine for this case as well, so 
that all code regarding state handling is
           // in one place
           // Also good for synchronization, because it is possible that 
multiple threads take this path, and we don't
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ZKMetadataUtils.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ZKMetadataUtils.java
index 11e67b1663..0afcb3aa59 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ZKMetadataUtils.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ZKMetadataUtils.java
@@ -18,11 +18,13 @@
  */
 package org.apache.pinot.controller.helix.core.util;
 
+import com.google.common.base.Preconditions;
 import java.util.HashMap;
 import java.util.Map;
 import javax.annotation.Nullable;
 import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.utils.SegmentName;
 import org.apache.pinot.segment.spi.SegmentMetadata;
 import org.apache.pinot.segment.spi.partition.PartitionFunction;
 import org.apache.pinot.segment.spi.partition.metadata.ColumnPartitionMetadata;
@@ -41,8 +43,7 @@ public class ZKMetadataUtils {
       String downloadUrl, @Nullable String crypterName, long 
segmentSizeInBytes) {
     SegmentZKMetadata segmentZKMetadata = new 
SegmentZKMetadata(segmentMetadata.getName());
     updateSegmentZKMetadata(tableNameWithType, segmentZKMetadata, 
segmentMetadata, downloadUrl, crypterName,
-        segmentSizeInBytes);
-    segmentZKMetadata.setPushTime(System.currentTimeMillis());
+        segmentSizeInBytes, true);
     return segmentZKMetadata;
   }
 
@@ -52,12 +53,18 @@ public class ZKMetadataUtils {
   public static void refreshSegmentZKMetadata(String tableNameWithType, 
SegmentZKMetadata segmentZKMetadata,
       SegmentMetadata segmentMetadata, String downloadUrl, @Nullable String 
crypterName, long segmentSizeInBytes) {
     updateSegmentZKMetadata(tableNameWithType, segmentZKMetadata, 
segmentMetadata, downloadUrl, crypterName,
-        segmentSizeInBytes);
-    segmentZKMetadata.setRefreshTime(System.currentTimeMillis());
+        segmentSizeInBytes, false);
   }
 
   private static void updateSegmentZKMetadata(String tableNameWithType, 
SegmentZKMetadata segmentZKMetadata,
-      SegmentMetadata segmentMetadata, String downloadUrl, @Nullable String 
crypterName, long segmentSizeInBytes) {
+      SegmentMetadata segmentMetadata, String downloadUrl, @Nullable String 
crypterName, long segmentSizeInBytes,
+      boolean newSegment) {
+    if (newSegment) {
+      segmentZKMetadata.setPushTime(System.currentTimeMillis());
+    } else {
+      segmentZKMetadata.setRefreshTime(System.currentTimeMillis());
+    }
+
     if (segmentMetadata.getTimeInterval() != null) {
       segmentZKMetadata.setStartTime(segmentMetadata.getStartTime());
       segmentZKMetadata.setEndTime(segmentMetadata.getEndTime());
@@ -67,11 +74,8 @@ public class ZKMetadataUtils {
       segmentZKMetadata.setEndTime(-1);
       segmentZKMetadata.setTimeUnit(null);
     }
-    if (segmentMetadata.getVersion() != null) {
-      segmentZKMetadata.setIndexVersion(segmentMetadata.getVersion().name());
-    } else {
-      segmentZKMetadata.setIndexVersion(null);
-    }
+    segmentZKMetadata.setIndexVersion(
+        segmentMetadata.getVersion() != null ? 
segmentMetadata.getVersion().name() : null);
     segmentZKMetadata.setTotalDocs(segmentMetadata.getTotalDocs());
     segmentZKMetadata.setSizeInBytes(segmentSizeInBytes);
     segmentZKMetadata.setCrc(Long.parseLong(segmentMetadata.getCrc()));
@@ -90,11 +94,8 @@ public class ZKMetadataUtils {
         columnPartitionMap.put(column, columnPartitionMetadata);
       }
     });
-    if (!columnPartitionMap.isEmpty()) {
-      segmentZKMetadata.setPartitionMetadata(new 
SegmentPartitionMetadata(columnPartitionMap));
-    } else {
-      segmentZKMetadata.setPartitionMetadata(null);
-    }
+    segmentZKMetadata.setPartitionMetadata(
+        !columnPartitionMap.isEmpty() ? new 
SegmentPartitionMetadata(columnPartitionMap) : null);
 
     // Update custom metadata
     // NOTE: Do not remove existing keys because they can be set by the HTTP 
header from the segment upload request
@@ -110,6 +111,12 @@ public class ZKMetadataUtils {
     if (TableNameBuilder.isRealtimeTableResource(tableNameWithType)) {
       
segmentZKMetadata.setStatus(CommonConstants.Segment.Realtime.Status.UPLOADED);
 
+      // For new segment, start/end offset must exist if the segment name 
follows LLC segment name convention
+      if (newSegment && 
SegmentName.isLowLevelConsumerSegmentName(segmentMetadata.getName())) {
+        Preconditions.checkArgument(segmentMetadata.getStartOffset() != null 
&& segmentMetadata.getEndOffset() != null,
+            "New uploaded LLC segment must have start/end offset in the 
segment metadata");
+      }
+
       // NOTE:
       // - If start/end offset is available in the uploaded segment, update 
them in the segment ZK metadata
       // - If not, keep the existing start/end offset in the segment ZK 
metadata unchanged
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
index 5831c0617a..b772fcabd1 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
@@ -34,9 +34,7 @@ import 
org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import 
org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
 import 
org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
 import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.stream.PartitionLevelStreamConfig;
-import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.pinot.spi.utils.IngestionConfigUtils;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.slf4j.Logger;
@@ -90,41 +88,39 @@ public class RealtimeSegmentValidationManager extends 
ControllerPeriodicTask<Rea
 
   @Override
   protected void processTable(String tableNameWithType, Context context) {
-    TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
-    if (tableType == TableType.REALTIME) {
+    if (!TableNameBuilder.isRealtimeTableResource(tableNameWithType)) {
+      return;
+    }
 
-      TableConfig tableConfig = 
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
-      if (tableConfig == null) {
-        LOGGER.warn("Failed to find table config for table: {}, skipping 
validation", tableNameWithType);
-        return;
-      }
+    TableConfig tableConfig = 
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
+    if (tableConfig == null) {
+      LOGGER.warn("Failed to find table config for table: {}, skipping 
validation", tableNameWithType);
+      return;
+    }
+    PartitionLevelStreamConfig streamConfig = new 
PartitionLevelStreamConfig(tableConfig.getTableName(),
+        IngestionConfigUtils.getStreamConfigMap(tableConfig));
 
-      if (context._runSegmentLevelValidation) {
-        runSegmentLevelValidation(tableConfig);
-      }
+    if (context._runSegmentLevelValidation) {
+      runSegmentLevelValidation(tableConfig, streamConfig);
+    }
 
-      PartitionLevelStreamConfig streamConfig = new 
PartitionLevelStreamConfig(tableConfig.getTableName(),
-          IngestionConfigUtils.getStreamConfigMap(tableConfig));
-      if (streamConfig.hasLowLevelConsumerType()) {
-        _llcRealtimeSegmentManager.ensureAllPartitionsConsuming(tableConfig,
-            streamConfig, context._recreateDeletedConsumingSegment);
-      }
+    if (streamConfig.hasLowLevelConsumerType()) {
+      _llcRealtimeSegmentManager.ensureAllPartitionsConsuming(tableConfig, 
streamConfig,
+          context._recreateDeletedConsumingSegment);
     }
   }
 
-  private void runSegmentLevelValidation(TableConfig tableConfig) {
+  private void runSegmentLevelValidation(TableConfig tableConfig, 
PartitionLevelStreamConfig streamConfig) {
     String realtimeTableName = tableConfig.getTableName();
     List<SegmentZKMetadata> segmentsZKMetadata = 
_pinotHelixResourceManager.getSegmentsZKMetadata(realtimeTableName);
-    boolean countHLCSegments = true;  // false if this table has ONLY LLC 
segments (i.e. fully migrated)
-    StreamConfig streamConfig =
-        new StreamConfig(realtimeTableName, 
IngestionConfigUtils.getStreamConfigMap(tableConfig));
-    if (streamConfig.hasLowLevelConsumerType() && 
!streamConfig.hasHighLevelConsumerType()) {
-      countHLCSegments = false;
-    }
-    // Update the gauge to contain the total document count in the segments
-    
_validationMetrics.updateTotalDocumentCountGauge(tableConfig.getTableName(),
-        computeRealtimeTotalDocumentInSegments(segmentsZKMetadata, 
countHLCSegments));
 
+    // Update the total document count gauge
+    // Count HLC segments if high level consumer is configured
+    boolean countHLCSegments = streamConfig.hasHighLevelConsumerType();
+    _validationMetrics.updateTotalDocumentCountGauge(realtimeTableName,
+        computeTotalDocumentCount(segmentsZKMetadata, countHLCSegments));
+
+    // Check missing segments and upload them to the deep store
     if (streamConfig.hasLowLevelConsumerType()
         && 
_llcRealtimeSegmentManager.isDeepStoreLLCSegmentUploadRetryEnabled()) {
       _llcRealtimeSegmentManager.uploadToDeepStoreIfMissing(tableConfig, 
segmentsZKMetadata);
@@ -134,42 +130,42 @@ public class RealtimeSegmentValidationManager extends 
ControllerPeriodicTask<Rea
   @Override
   protected void nonLeaderCleanup(List<String> tableNamesWithType) {
     for (String tableNameWithType : tableNamesWithType) {
-      TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
-      if (tableType == TableType.REALTIME) {
+      if (TableNameBuilder.isRealtimeTableResource(tableNameWithType)) {
         _validationMetrics.cleanupTotalDocumentCountGauge(tableNameWithType);
       }
     }
   }
 
   @VisibleForTesting
-  static long computeRealtimeTotalDocumentInSegments(List<SegmentZKMetadata> 
segmentsZKMetadata,
-      boolean countHLCSegments) {
+  static long computeTotalDocumentCount(List<SegmentZKMetadata> 
segmentsZKMetadata, boolean countHLCSegments) {
     long numTotalDocs = 0;
-
-    String groupId = "";
-    for (SegmentZKMetadata segmentZKMetadata : segmentsZKMetadata) {
-      String segmentName = segmentZKMetadata.getSegmentName();
-      if (SegmentName.isHighLevelConsumerSegmentName(segmentName)) {
-        if (countHLCSegments) {
+    if (countHLCSegments) {
+      String groupId = null;
+      for (SegmentZKMetadata segmentZKMetadata : segmentsZKMetadata) {
+        String segmentName = segmentZKMetadata.getSegmentName();
+        if (SegmentName.isHighLevelConsumerSegmentName(segmentName)) {
           HLCSegmentName hlcSegmentName = new HLCSegmentName(segmentName);
-          String segmentGroupIdName = hlcSegmentName.getGroupId();
-
-          if (groupId.isEmpty()) {
-            groupId = segmentGroupIdName;
-          }
-          // Discard all segments with different groupids as they are replicas
-          if (groupId.equals(segmentGroupIdName) && 
segmentZKMetadata.getTotalDocs() >= 0) {
-            numTotalDocs += segmentZKMetadata.getTotalDocs();
+          String segmentGroupId = hlcSegmentName.getGroupId();
+          if (groupId == null) {
+            groupId = segmentGroupId;
+            numTotalDocs = segmentZKMetadata.getTotalDocs();
+          } else {
+            // Discard all segments with different group id as they are 
replicas
+            if (groupId.equals(segmentGroupId)) {
+              numTotalDocs += segmentZKMetadata.getTotalDocs();
+            }
           }
         }
-      } else {
-        // Low level segments
-        if (!countHLCSegments) {
+      }
+    } else {
+      for (SegmentZKMetadata segmentZKMetadata : segmentsZKMetadata) {
+        String segmentName = segmentZKMetadata.getSegmentName();
+        if (!SegmentName.isHighLevelConsumerSegmentName(segmentName)) {
+          // LLC segments or uploaded segments
           numTotalDocs += segmentZKMetadata.getTotalDocs();
         }
       }
     }
-
     return numTotalDocs;
   }
 
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java
index 1f29a8731d..28c33eb256 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java
@@ -20,15 +20,24 @@ package org.apache.pinot.controller.api.upload;
 
 import java.io.File;
 import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
 import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.Response;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.common.utils.URIUtils;
 import org.apache.pinot.controller.ControllerConf;
+import 
org.apache.pinot.controller.api.exception.ControllerApplicationException;
 import org.apache.pinot.controller.helix.ControllerTest;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.segment.spi.SegmentMetadata;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.CommonConstants.Segment.Realtime.Status;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.apache.pinot.util.TestUtils;
@@ -38,32 +47,57 @@ import org.testng.annotations.Test;
 
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertTrue;
-import static org.testng.Assert.fail;
+import static org.testng.Assert.*;
 
 
 public class ZKOperatorTest {
-  private static final String TABLE_NAME = "operatorTestTable";
-  private static final String OFFLINE_TABLE_NAME = 
TableNameBuilder.OFFLINE.tableNameWithType(TABLE_NAME);
+  private static final String RAW_TABLE_NAME = "testTable";
+  private static final String OFFLINE_TABLE_NAME = 
TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME);
+  private static final String REALTIME_TABLE_NAME = 
TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME);
+  private static final String TIME_COLUMN = "timeColumn";
   private static final String SEGMENT_NAME = "testSegment";
+  // NOTE: The FakeStreamConsumerFactory will create 2 stream partitions. Use 
partition 2 to avoid conflict.
+  private static final String LLC_SEGMENT_NAME =
+      new LLCSegmentName(RAW_TABLE_NAME, 2, 0, 
System.currentTimeMillis()).getSegmentName();
   private static final ControllerTest TEST_INSTANCE = 
ControllerTest.getInstance();
 
+  private PinotHelixResourceManager _resourceManager;
+
   @BeforeClass
   public void setUp()
       throws Exception {
     TEST_INSTANCE.setupSharedStateAndValidate();
+    _resourceManager = TEST_INSTANCE.getHelixResourceManager();
+
+    Schema schema = new Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME)
+        .addDateTime(TIME_COLUMN, DataType.TIMESTAMP, 
"1:MILLISECONDS:TIMESTAMP", "1:MILLISECONDS").build();
+    TableConfig offlineTableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build();
+    TableConfig realtimeTableConfig =
+        new 
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setTimeColumnName(TIME_COLUMN)
+            
.setStreamConfigs(getStreamConfigs()).setLLC(true).setNumReplicas(1).build();
+
+    _resourceManager.addSchema(schema, false);
+    _resourceManager.addTable(offlineTableConfig);
+    _resourceManager.addTable(realtimeTableConfig);
+  }
 
-    TableConfig tableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
-    TEST_INSTANCE.getHelixResourceManager().addTable(tableConfig);
+  private Map<String, String> getStreamConfigs() {
+    Map<String, String> streamConfigs = new HashMap<>();
+    streamConfigs.put("streamType", "kafka");
+    streamConfigs.put("stream.kafka.topic.name", "kafkaTopic");
+    streamConfigs.put("stream.kafka.consumer.type", "simple");
+    streamConfigs.put("stream.kafka.decoder.class.name",
+        "org.apache.pinot.plugin.stream.kafka.KafkaAvroMessageDecoder");
+    streamConfigs.put("stream.kafka.consumer.factory.class.name",
+        
"org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConsumerFactory");
+    return streamConfigs;
   }
 
   @Test
   public void testCompleteSegmentOperations()
       throws Exception {
-    ZKOperator zkOperator = new 
ZKOperator(TEST_INSTANCE.getHelixResourceManager(), mock(ControllerConf.class),
-        mock(ControllerMetrics.class));
+    ZKOperator zkOperator = new ZKOperator(_resourceManager, 
mock(ControllerConf.class), mock(ControllerMetrics.class));
+
     SegmentMetadata segmentMetadata = mock(SegmentMetadata.class);
     when(segmentMetadata.getName()).thenReturn(SEGMENT_NAME);
     when(segmentMetadata.getCrc()).thenReturn("12345");
@@ -86,16 +120,14 @@ public class ZKOperatorTest {
 
     // Wait for the segment Zk entry to be deleted.
     TestUtils.waitForCondition(aVoid -> {
-      SegmentZKMetadata segmentZKMetadata =
-          
TEST_INSTANCE.getHelixResourceManager().getSegmentZKMetadata(OFFLINE_TABLE_NAME,
 SEGMENT_NAME);
+      SegmentZKMetadata segmentZKMetadata = 
_resourceManager.getSegmentZKMetadata(OFFLINE_TABLE_NAME, SEGMENT_NAME);
       return segmentZKMetadata == null;
     }, 30_000L, "Failed to delete segmentZkMetadata.");
 
     zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, 
null, null, "downloadUrl", "crypter", 10,
         true, true, httpHeaders);
 
-    SegmentZKMetadata segmentZKMetadata =
-        
TEST_INSTANCE.getHelixResourceManager().getSegmentZKMetadata(OFFLINE_TABLE_NAME,
 SEGMENT_NAME);
+    SegmentZKMetadata segmentZKMetadata = 
_resourceManager.getSegmentZKMetadata(OFFLINE_TABLE_NAME, SEGMENT_NAME);
     assertNotNull(segmentZKMetadata);
     assertEquals(segmentZKMetadata.getCrc(), 12345L);
     assertEquals(segmentZKMetadata.getCreationTime(), 123L);
@@ -132,8 +164,8 @@ public class ZKOperatorTest {
     when(segmentMetadata.getIndexCreationTime()).thenReturn(456L);
     zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, 
null, null, "otherDownloadUrl",
         "otherCrypter", 10, true, true, httpHeaders);
-    segmentZKMetadata =
-        
TEST_INSTANCE.getHelixResourceManager().getSegmentZKMetadata(OFFLINE_TABLE_NAME,
 SEGMENT_NAME);
+
+    segmentZKMetadata = 
_resourceManager.getSegmentZKMetadata(OFFLINE_TABLE_NAME, SEGMENT_NAME);
     assertNotNull(segmentZKMetadata);
     assertEquals(segmentZKMetadata.getCrc(), 12345L);
     // Push time should not change
@@ -155,8 +187,8 @@ public class ZKOperatorTest {
     Thread.sleep(10);
     zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, 
null, null, "otherDownloadUrl",
         "otherCrypter", 100, true, true, httpHeaders);
-    segmentZKMetadata =
-        
TEST_INSTANCE.getHelixResourceManager().getSegmentZKMetadata(OFFLINE_TABLE_NAME,
 SEGMENT_NAME);
+
+    segmentZKMetadata = 
_resourceManager.getSegmentZKMetadata(OFFLINE_TABLE_NAME, SEGMENT_NAME);
     assertNotNull(segmentZKMetadata);
     assertEquals(segmentZKMetadata.getCrc(), 23456L);
     // Push time should not change
@@ -169,6 +201,74 @@ public class ZKOperatorTest {
     assertEquals(segmentZKMetadata.getSizeInBytes(), 100);
   }
 
+  @Test
+  public void testPushToRealtimeTable()
+      throws Exception {
+    ZKOperator zkOperator = new ZKOperator(_resourceManager, 
mock(ControllerConf.class), mock(ControllerMetrics.class));
+
+    SegmentMetadata segmentMetadata = mock(SegmentMetadata.class);
+    when(segmentMetadata.getName()).thenReturn(SEGMENT_NAME);
+    when(segmentMetadata.getCrc()).thenReturn("12345");
+    zkOperator.completeSegmentOperations(REALTIME_TABLE_NAME, segmentMetadata, 
null, null, "downloadUrl", null, 10,
+        true, true, mock(HttpHeaders.class));
+
+    SegmentZKMetadata segmentZKMetadata = 
_resourceManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, SEGMENT_NAME);
+    assertNotNull(segmentZKMetadata);
+    assertEquals(segmentZKMetadata.getStatus(), Status.UPLOADED);
+    assertNull(segmentMetadata.getStartOffset());
+    assertNull(segmentMetadata.getEndOffset());
+
+    // Uploading a segment with LLC segment name but without start/end offset 
should fail
+    when(segmentMetadata.getName()).thenReturn(LLC_SEGMENT_NAME);
+    when(segmentMetadata.getCrc()).thenReturn("23456");
+    try {
+      zkOperator.completeSegmentOperations(REALTIME_TABLE_NAME, 
segmentMetadata, null, null, "downloadUrl", null, 10,
+          true, true, mock(HttpHeaders.class));
+      fail();
+    } catch (ControllerApplicationException e) {
+      assertEquals(e.getResponse().getStatus(), 
Response.Status.BAD_REQUEST.getStatusCode());
+    }
+    assertNull(_resourceManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, 
LLC_SEGMENT_NAME));
+
+    // Uploading a segment with LLC segment name and start/end offset should 
success
+    when(segmentMetadata.getStartOffset()).thenReturn("0");
+    when(segmentMetadata.getEndOffset()).thenReturn("1234");
+    zkOperator.completeSegmentOperations(REALTIME_TABLE_NAME, segmentMetadata, 
null, null, "downloadUrl", null, 10,
+        true, true, mock(HttpHeaders.class));
+
+    segmentZKMetadata = 
_resourceManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, LLC_SEGMENT_NAME);
+    assertNotNull(segmentZKMetadata);
+    assertEquals(segmentZKMetadata.getStatus(), Status.UPLOADED);
+    assertEquals(segmentZKMetadata.getStartOffset(), "0");
+    assertEquals(segmentZKMetadata.getEndOffset(), "1234");
+
+    // Refreshing a segment with LLC segment name but without start/end offset 
should success
+    when(segmentMetadata.getCrc()).thenReturn("34567");
+    when(segmentMetadata.getStartOffset()).thenReturn(null);
+    when(segmentMetadata.getEndOffset()).thenReturn(null);
+    zkOperator.completeSegmentOperations(REALTIME_TABLE_NAME, segmentMetadata, 
null, null, "downloadUrl", null, 10,
+        true, true, mock(HttpHeaders.class));
+
+    segmentZKMetadata = 
_resourceManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, LLC_SEGMENT_NAME);
+    assertNotNull(segmentZKMetadata);
+    assertEquals(segmentZKMetadata.getStatus(), Status.UPLOADED);
+    assertEquals(segmentZKMetadata.getStartOffset(), "0");
+    assertEquals(segmentZKMetadata.getEndOffset(), "1234");
+
+    // Refreshing a segment with LLC segment name and start/end offset should 
override the offsets
+    when(segmentMetadata.getCrc()).thenReturn("45678");
+    when(segmentMetadata.getStartOffset()).thenReturn("1234");
+    when(segmentMetadata.getEndOffset()).thenReturn("2345");
+    zkOperator.completeSegmentOperations(REALTIME_TABLE_NAME, segmentMetadata, 
null, null, "downloadUrl", null, 10,
+        true, true, mock(HttpHeaders.class));
+
+    segmentZKMetadata = 
_resourceManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, LLC_SEGMENT_NAME);
+    assertNotNull(segmentZKMetadata);
+    assertEquals(segmentZKMetadata.getStatus(), Status.UPLOADED);
+    assertEquals(segmentZKMetadata.getStartOffset(), "1234");
+    assertEquals(segmentZKMetadata.getEndOffset(), "2345");
+  }
+
   @AfterClass
   public void tearDown() {
     TEST_INSTANCE.cleanup();
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java
index 711b552fee..204a2ff7dd 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java
@@ -221,8 +221,9 @@ public class PinotResourceManagerTest {
     Map<Integer, Set<String>> segmentAssignment = new HashMap<>();
     for (String segment : segments) {
       Integer partitionId;
-      if (LLCSegmentName.isLowLevelConsumerSegmentName(segment)) {
-        partitionId = new LLCSegmentName(segment).getPartitionGroupId();
+      LLCSegmentName llcSegmentName = LLCSegmentName.of(segment);
+      if (llcSegmentName != null) {
+        partitionId = llcSegmentName.getPartitionGroupId();
       } else {
         partitionId = segment2PartitionId.get(segment);
       }
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
index 53efb6ad9c..2ada570ac8 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
@@ -1349,10 +1349,6 @@ public class SegmentCompletionTest {
       return new LongMsgOffsetFactory();
     }
 
-    public StreamPartitionMsgOffsetFactory 
getStreamPartitionMsgOffsetFactory(String segmentName) {
-      return new LongMsgOffsetFactory();
-    }
-
     @Override
     protected long getCurrentTimeMs() {
       return _seconds * 1000L;
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ValidationManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ValidationManagerTest.java
index c981ed0f14..c371dc9dc2 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ValidationManagerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ValidationManagerTest.java
@@ -36,12 +36,13 @@ import org.joda.time.DateTime;
 import org.joda.time.Duration;
 import org.joda.time.Interval;
 import org.mockito.Mockito;
-import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
 
 
 /**
@@ -54,29 +55,27 @@ public class ValidationManagerTest {
   private static final String TEST_SEGMENT_NAME = "testSegment";
   private static final int EXPECTED_VERSION = -1;
 
-  private TableConfig _offlineTableConfig;
-
   @BeforeClass
   public void setUp()
       throws Exception {
     TEST_INSTANCE.setupSharedStateAndValidate();
 
-    _offlineTableConfig =
+    TableConfig offlineTableConfig =
         new 
TableConfigBuilder(TableType.OFFLINE).setTableName(TEST_TABLE_NAME).setNumReplicas(2).build();
-    TEST_INSTANCE.getHelixResourceManager().addTable(_offlineTableConfig);
+    TEST_INSTANCE.getHelixResourceManager().addTable(offlineTableConfig);
   }
 
   @Test
   public void testPushTimePersistence() {
     SegmentMetadata segmentMetadata = 
SegmentMetadataMockUtils.mockSegmentMetadata(TEST_TABLE_NAME, 
TEST_SEGMENT_NAME);
 
-    TEST_INSTANCE.getHelixResourceManager()
-        .addNewSegment(OFFLINE_TEST_TABLE_NAME, segmentMetadata, 
"downloadUrl");
+    
TEST_INSTANCE.getHelixResourceManager().addNewSegment(OFFLINE_TEST_TABLE_NAME, 
segmentMetadata, "downloadUrl");
     SegmentZKMetadata segmentZKMetadata =
         
TEST_INSTANCE.getHelixResourceManager().getSegmentZKMetadata(OFFLINE_TEST_TABLE_NAME,
 TEST_SEGMENT_NAME);
+    assertNotNull(segmentZKMetadata);
     long pushTime = segmentZKMetadata.getPushTime();
     // Check that the segment has been pushed in the last 30 seconds
-    Assert.assertTrue(System.currentTimeMillis() - pushTime < 30_000);
+    assertTrue(System.currentTimeMillis() - pushTime < 30_000);
     // Check that there is no refresh time
     assertEquals(segmentZKMetadata.getRefreshTime(), Long.MIN_VALUE);
 
@@ -84,8 +83,8 @@ public class ValidationManagerTest {
     // NOTE: In order to send the refresh message, the segment need to be in 
the ExternalView
     String offlineTableName = 
TableNameBuilder.OFFLINE.tableNameWithType(TEST_TABLE_NAME);
     TestUtils.waitForCondition(aVoid -> {
-      ExternalView externalView = TEST_INSTANCE.getHelixAdmin()
-          .getResourceExternalView(TEST_INSTANCE.getHelixClusterName(), 
offlineTableName);
+      ExternalView externalView =
+          
TEST_INSTANCE.getHelixAdmin().getResourceExternalView(TEST_INSTANCE.getHelixClusterName(),
 offlineTableName);
       return externalView != null && 
externalView.getPartitionSet().contains(TEST_SEGMENT_NAME);
     }, 30_000L, "Failed to find the segment in the ExternalView");
     
Mockito.when(segmentMetadata.getCrc()).thenReturn(Long.toString(System.nanoTime()));
@@ -94,15 +93,15 @@ public class ValidationManagerTest {
 
     segmentZKMetadata =
         
TEST_INSTANCE.getHelixResourceManager().getSegmentZKMetadata(OFFLINE_TEST_TABLE_NAME,
 TEST_SEGMENT_NAME);
+    assertNotNull(segmentZKMetadata);
     // Check that the segment still has the same push time
     assertEquals(segmentZKMetadata.getPushTime(), pushTime);
     // Check that the refresh time is in the last 30 seconds
-    Assert.assertTrue(System.currentTimeMillis() - 
segmentZKMetadata.getRefreshTime() < 30_000L);
+    assertTrue(System.currentTimeMillis() - segmentZKMetadata.getRefreshTime() 
< 30_000L);
   }
 
   @Test
-  public void testTotalDocumentCountRealTime()
-      throws Exception {
+  public void testTotalDocumentCountRealTime() {
     // Create a bunch of dummy segments
     final String group1 = TEST_TABLE_NAME + "_REALTIME_1466446700000_34";
     final String group2 = TEST_TABLE_NAME + "_REALTIME_1466446700000_17";
@@ -118,17 +117,17 @@ public class ValidationManagerTest {
     // This should get ignored in the count as it belongs to a different group 
id
     
segmentsZKMetadata.add(SegmentMetadataMockUtils.mockSegmentZKMetadata(segmentName4,
 20));
 
-    
assertEquals(RealtimeSegmentValidationManager.computeRealtimeTotalDocumentInSegments(segmentsZKMetadata,
 true), 60);
+    
assertEquals(RealtimeSegmentValidationManager.computeTotalDocumentCount(segmentsZKMetadata,
 true), 60);
 
-    // Now add some low level segment names
+    // Now add some LLC segments (both committed and uploaded)
     String segmentName5 = new LLCSegmentName(TEST_TABLE_NAME, 1, 0, 
1000).getSegmentName();
     String segmentName6 = new LLCSegmentName(TEST_TABLE_NAME, 2, 27, 
10000).getSegmentName();
     
segmentsZKMetadata.add(SegmentMetadataMockUtils.mockSegmentZKMetadata(segmentName5,
 10));
     
segmentsZKMetadata.add(SegmentMetadataMockUtils.mockSegmentZKMetadata(segmentName6,
 5));
+    
segmentsZKMetadata.add(SegmentMetadataMockUtils.mockSegmentZKMetadata(TEST_SEGMENT_NAME,
 15));
 
     // Only the LLC segments should get counted.
-    
assertEquals(RealtimeSegmentValidationManager.computeRealtimeTotalDocumentInSegments(segmentsZKMetadata,
 false),
-        15);
+    
assertEquals(RealtimeSegmentValidationManager.computeTotalDocumentCount(segmentsZKMetadata,
 false), 30);
   }
 
   @Test
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 198023632c..d8db5624b9 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -290,61 +290,52 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
     // of the index directory and loading segment from it
     LoaderUtils.reloadFailureRecovery(segmentDir);
 
-    boolean isLLCSegment = 
SegmentName.isLowLevelConsumerSegmentName(segmentName);
-    if (segmentDir.exists()) {
-      // Segment already exists on disk
-      if (segmentZKMetadata.getStatus() == Status.DONE || 
segmentZKMetadata.getStatus() == Status.UPLOADED) {
-        // Metadata has been committed, load the local segment
+    boolean isHLCSegment = 
SegmentName.isHighLevelConsumerSegmentName(segmentName);
+    if (segmentZKMetadata.getStatus().isCompleted()) {
+      if (segmentDir.exists()) {
+        // Local segment exists, try to load it
         try {
           addSegment(ImmutableSegmentLoader.load(segmentDir, 
indexLoadingConfig, schema));
           return;
         } catch (Exception e) {
-          if (isLLCSegment) {
-            // For LLC and segments, delete the local copy and download a new 
copy from the controller
-            FileUtils.deleteQuietly(segmentDir);
+          if (!isHLCSegment) {
+            // For LLC and uploaded segments, delete the local copy and 
download a new copy
             _logger.error("Caught exception while loading segment: {}, 
downloading a new copy", segmentName, e);
+            FileUtils.deleteQuietly(segmentDir);
           } else {
             // For HLC segments, throw out the exception because there is no 
way to recover (controller does not have a
             // copy of the segment)
-            throw e;
+            throw new RuntimeException("Failed to load local HLC segment: " + 
segmentName, e);
           }
         }
       } else {
-        // Metadata has not been committed, delete the local segment
-        FileUtils.deleteQuietly(segmentDir);
+        if (isHLCSegment) {
+          throw new RuntimeException("Failed to find local copy for committed 
HLC segment: " + segmentName);
+        }
       }
-    } else if (segmentZKMetadata.getStatus() == Status.UPLOADED) {
-      // The segment is uploaded to an upsert enabled realtime table. Download 
the segment and load.
-      String downloadUrl = segmentZKMetadata.getDownloadUrl();
-      Preconditions.checkNotNull(downloadUrl, "Upload segment metadata has no 
download url");
-      downloadSegmentFromDeepStore(segmentName, indexLoadingConfig, 
downloadUrl);
-      _logger
-          .info("Downloaded, untarred and add segment {} of table {} from {}", 
segmentName, tableConfig.getTableName(),
-              downloadUrl);
+      // Local segment doesn't exist or cannot load, download a new copy
+      downloadAndReplaceSegment(segmentName, segmentZKMetadata, 
indexLoadingConfig, tableConfig);
       return;
+    } else {
+      // Metadata has not been committed, delete the local segment if exists
+      FileUtils.deleteQuietly(segmentDir);
     }
 
-    // Start a new consuming segment or download the segment from the 
controller
-
+    // Start a new consuming segment
     if (!isValid(schema, tableConfig.getIndexingConfig())) {
       _logger.error("Not adding segment {}", segmentName);
       throw new RuntimeException("Mismatching schema/table config for " + 
_tableNameWithType);
     }
     
VirtualColumnProviderFactory.addBuiltInVirtualColumnsToSegmentSchema(schema, 
segmentName);
 
-    if (isLLCSegment) {
-      if (segmentZKMetadata.getStatus() == Status.DONE) {
-        downloadAndReplaceSegment(segmentName, segmentZKMetadata, 
indexLoadingConfig, tableConfig);
-        return;
-      }
-
+    if (!isHLCSegment) {
       // Generates only one semaphore for every partitionGroupId
       LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
       int partitionGroupId = llcSegmentName.getPartitionGroupId();
       Semaphore semaphore = 
_partitionGroupIdToSemaphoreMap.computeIfAbsent(partitionGroupId, k -> new 
Semaphore(1));
       PartitionUpsertMetadataManager partitionUpsertMetadataManager =
-          _tableUpsertMetadataManager != null ? _tableUpsertMetadataManager
-              .getOrCreatePartitionManager(partitionGroupId) : null;
+          _tableUpsertMetadataManager != null ? 
_tableUpsertMetadataManager.getOrCreatePartitionManager(
+              partitionGroupId) : null;
       segmentDataManager =
           new LLRealtimeSegmentDataManager(segmentZKMetadata, tableConfig, 
this, _indexDir.getAbsolutePath(),
               indexLoadingConfig, schema, llcSegmentName, semaphore, 
_serverMetrics, partitionUpsertMetadataManager);
@@ -419,7 +410,7 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
 
   @Override
   protected boolean allowDownload(String segmentName, SegmentZKMetadata 
zkMetadata) {
-    // Only LLC immutable segment allows download.
+    // Cannot download HLC segment or consuming segment
     if (SegmentName.isHighLevelConsumerSegmentName(segmentName) || 
zkMetadata.getStatus() == Status.IN_PROGRESS) {
       return false;
     }
diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java
index c96d5d4791..76d8333672 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java
@@ -117,18 +117,18 @@ public class RealtimeToOfflineSegmentsTaskGenerator 
extends BaseTaskGenerator {
         continue;
       }
 
-      // Get all segment metadata for completed segments (DONE status).
+      // Get all segment metadata for completed segments (DONE/UPLOADED 
status).
       List<SegmentZKMetadata> completedSegmentsZKMetadata = new ArrayList<>();
-      Map<Integer, String> partitionToLatestCompletedSegmentName = new 
HashMap<>();
+      Map<Integer, String> partitionToLatestLLCSegmentName = new HashMap<>();
       Set<Integer> allPartitions = new HashSet<>();
-      getCompletedSegmentsInfo(realtimeTableName, completedSegmentsZKMetadata, 
partitionToLatestCompletedSegmentName,
+      getCompletedSegmentsInfo(realtimeTableName, completedSegmentsZKMetadata, 
partitionToLatestLLCSegmentName,
           allPartitions);
       if (completedSegmentsZKMetadata.isEmpty()) {
         LOGGER.info("No realtime-completed segments found for table: {}, 
skipping task generation: {}",
             realtimeTableName, taskType);
         continue;
       }
-      allPartitions.removeAll(partitionToLatestCompletedSegmentName.keySet());
+      allPartitions.removeAll(partitionToLatestLLCSegmentName.keySet());
       if (!allPartitions.isEmpty()) {
         LOGGER.info(
             "Partitions: {} have no completed segments. Table: {} is not ready 
for {}. Skipping task generation.",
@@ -158,7 +158,7 @@ public class RealtimeToOfflineSegmentsTaskGenerator extends 
BaseTaskGenerator {
       // (exclusive)
       List<String> segmentNames = new ArrayList<>();
       List<String> downloadURLs = new ArrayList<>();
-      Set<String> lastCompletedSegmentPerPartition = new 
HashSet<>(partitionToLatestCompletedSegmentName.values());
+      Set<String> lastLLCSegmentPerPartition = new 
HashSet<>(partitionToLatestLLCSegmentName.values());
       boolean skipGenerate = false;
       while (true) {
         // Check that execution window is older than bufferTime
@@ -180,7 +180,7 @@ public class RealtimeToOfflineSegmentsTaskGenerator extends 
BaseTaskGenerator {
             // If last completed segment is being used, make sure that segment 
crosses over end of window.
             // In the absence of this check, CONSUMING segments could contain 
some portion of the window. That data
             // would be skipped forever.
-            if (lastCompletedSegmentPerPartition.contains(segmentName) && 
segmentEndTimeMs < windowEndMs) {
+            if (lastLLCSegmentPerPartition.contains(segmentName) && 
segmentEndTimeMs < windowEndMs) {
               LOGGER.info("Window data overflows into CONSUMING segments for 
partition of segment: {}. Skipping task "
                   + "generation: {}", segmentName, taskType);
               skipGenerate = true;
@@ -243,41 +243,44 @@ public class RealtimeToOfflineSegmentsTaskGenerator 
extends BaseTaskGenerator {
   }
 
   /**
-   * Fetch completed (non-consuming) segment and partition information
+   * Fetch completed (DONE/UPLOADED) segment and partition information
+   *
    * @param realtimeTableName the realtime table name
-   * @param completedSegmentsZKMetadata list for collecting the completed 
segments ZK metadata
-   * @param partitionToLatestCompletedSegmentName map for collecting the 
partitionId to the latest completed segment
-   *                                              name
+   * @param completedSegmentsZKMetadata list for collecting the completed 
(DONE/UPLOADED) segments ZK metadata
+   * @param partitionToLatestLLCSegmentName map for collecting the partitionId 
to the latest LLC segment name
    * @param allPartitions set for collecting all partition ids
    */
   private void getCompletedSegmentsInfo(String realtimeTableName, 
List<SegmentZKMetadata> completedSegmentsZKMetadata,
-      Map<Integer, String> partitionToLatestCompletedSegmentName, Set<Integer> 
allPartitions) {
+      Map<Integer, String> partitionToLatestLLCSegmentName, Set<Integer> 
allPartitions) {
     List<SegmentZKMetadata> segmentsZKMetadata = 
_clusterInfoAccessor.getSegmentsZKMetadata(realtimeTableName);
 
     Map<Integer, LLCSegmentName> latestLLCSegmentNameMap = new HashMap<>();
     for (SegmentZKMetadata segmentZKMetadata : segmentsZKMetadata) {
-      LLCSegmentName llcSegmentName = new 
LLCSegmentName(segmentZKMetadata.getSegmentName());
-      allPartitions.add(llcSegmentName.getPartitionGroupId());
-
-      if (segmentZKMetadata.getStatus().equals(Segment.Realtime.Status.DONE)) {
+      Segment.Realtime.Status status = segmentZKMetadata.getStatus();
+      if (status.isCompleted()) {
         completedSegmentsZKMetadata.add(segmentZKMetadata);
-        latestLLCSegmentNameMap.compute(llcSegmentName.getPartitionGroupId(),
-            (partitionGroupId, latestLLCSegmentName) -> {
-              if (latestLLCSegmentName == null) {
-                return llcSegmentName;
-              } else {
-                if (llcSegmentName.getSequenceNumber() > 
latestLLCSegmentName.getSequenceNumber()) {
-                  return llcSegmentName;
-                } else {
-                  return latestLLCSegmentName;
-                }
-              }
-            });
+      }
+
+      // Skip UPLOADED segments that don't conform to the LLC segment name
+      LLCSegmentName llcSegmentName = 
LLCSegmentName.of(segmentZKMetadata.getSegmentName());
+      if (llcSegmentName != null) {
+        int partitionId = llcSegmentName.getPartitionGroupId();
+        allPartitions.add(partitionId);
+        if (status.isCompleted()) {
+          latestLLCSegmentNameMap.compute(partitionId, (k, 
latestLLCSegmentName) -> {
+            if (latestLLCSegmentName == null
+                || llcSegmentName.getSequenceNumber() > 
latestLLCSegmentName.getSequenceNumber()) {
+              return llcSegmentName;
+            } else {
+              return latestLLCSegmentName;
+            }
+          });
+        }
       }
     }
 
     for (Map.Entry<Integer, LLCSegmentName> entry : 
latestLLCSegmentNameMap.entrySet()) {
-      partitionToLatestCompletedSegmentName.put(entry.getKey(), 
entry.getValue().getSegmentName());
+      partitionToLatestLLCSegmentName.put(entry.getKey(), 
entry.getValue().getSegmentName());
     }
   }
 
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/readerwriter/RealtimeIndexOffHeapMemoryManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/readerwriter/RealtimeIndexOffHeapMemoryManager.java
index 12614e2718..861fa953a1 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/readerwriter/RealtimeIndexOffHeapMemoryManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/readerwriter/RealtimeIndexOffHeapMemoryManager.java
@@ -51,8 +51,8 @@ public abstract class RealtimeIndexOffHeapMemoryManager 
implements PinotDataBuff
   protected RealtimeIndexOffHeapMemoryManager(ServerMetrics serverMetrics, 
String segmentName) {
     _serverMetrics = serverMetrics;
     _segmentName = segmentName;
-    if (SegmentName.isLowLevelConsumerSegmentName(segmentName)) {
-      LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
+    LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentName);
+    if (llcSegmentName != null) {
       _tableName = llcSegmentName.getTableName();
     } else if (SegmentName.isHighLevelConsumerSegmentName(segmentName)) {
       HLCSegmentName hlcSegmentName = new HLCSegmentName(segmentName);
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index be0eeb3e00..0b6426f92a 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -544,11 +544,16 @@ public class CommonConstants {
   public static class Segment {
     public static class Realtime {
       public enum Status {
-        // Means the segment is in CONSUMING state.
-        IN_PROGRESS, // Means the segment is in ONLINE state (segment 
completed consuming and has been saved in
-        // segment store).
-        DONE, // Means the segment is uploaded to a Pinot controller by an 
external party.
-        UPLOADED
+        IN_PROGRESS, // The segment is still consuming data
+        DONE, // The segment has finished consumption and has been committed 
to the segment store
+        UPLOADED; // The segment is uploaded by an external party
+
+        /**
+         * Returns {@code true} if the segment is completed (DONE/UPLOADED), 
{@code false} otherwise.
+         */
+        public boolean isCompleted() {
+          return this != IN_PROGRESS;
+        }
       }
 
       /**


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to