This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 294ea08a32 Misc fixes on segment validation for uploaded real-time
segments (#8786)
294ea08a32 is described below
commit 294ea08a32c2174c56739517c923f054aafb8435
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Mon May 30 12:29:29 2022 -0700
Misc fixes on segment validation for uploaded real-time segments (#8786)
---
.../common/tier/FixedTierSegmentSelector.java | 3 +-
.../apache/pinot/common/utils/LLCSegmentName.java | 57 +++++----
.../org/apache/pinot/common/utils/LLCUtils.java | 54 --------
.../apache/pinot/common/utils/SegmentUtils.java | 15 ++-
.../pinot/controller/api/upload/ZKOperator.java | 13 +-
.../realtime/PinotLLCRealtimeSegmentManager.java | 121 +++++++++---------
.../core/realtime/SegmentCompletionManager.java | 7 +-
.../helix/core/util/ZKMetadataUtils.java | 37 +++---
.../RealtimeSegmentValidationManager.java | 96 +++++++--------
.../controller/api/upload/ZKOperatorTest.java | 136 ++++++++++++++++++---
.../controller/helix/PinotResourceManagerTest.java | 5 +-
.../helix/core/realtime/SegmentCompletionTest.java | 4 -
.../validation/ValidationManagerTest.java | 33 +++--
.../manager/realtime/RealtimeTableDataManager.java | 51 ++++----
.../RealtimeToOfflineSegmentsTaskGenerator.java | 59 ++++-----
.../RealtimeIndexOffHeapMemoryManager.java | 4 +-
.../apache/pinot/spi/utils/CommonConstants.java | 15 ++-
17 files changed, 373 insertions(+), 337 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/tier/FixedTierSegmentSelector.java
b/pinot-common/src/main/java/org/apache/pinot/common/tier/FixedTierSegmentSelector.java
index c9db7478d0..19ac10c91e 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/tier/FixedTierSegmentSelector.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/tier/FixedTierSegmentSelector.java
@@ -25,7 +25,6 @@ import org.apache.helix.HelixManager;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.utils.CommonConstants.Segment.Realtime;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@@ -64,7 +63,7 @@ public class FixedTierSegmentSelector implements
TierSegmentSelector {
segmentName);
Preconditions.checkNotNull(segmentZKMetadata, "Could not find zk
metadata for segment: {} of table: {}",
segmentName, tableNameWithType);
- return
!segmentZKMetadata.getStatus().equals(Realtime.Status.IN_PROGRESS);
+ return segmentZKMetadata.getStatus().isCompleted();
}
return false;
}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/LLCSegmentName.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/LLCSegmentName.java
index 4bae626f49..2a533a0e68 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/LLCSegmentName.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/LLCSegmentName.java
@@ -18,6 +18,9 @@
*/
package org.apache.pinot.common.utils;
+import com.google.common.base.Preconditions;
+import java.util.Objects;
+import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
import org.joda.time.DateTime;
import org.joda.time.format.DateTimeFormat;
@@ -35,16 +38,13 @@ public class LLCSegmentName extends SegmentName implements
Comparable {
private final String _segmentName;
public LLCSegmentName(String segmentName) {
- if (!isLowLevelConsumerSegmentName(segmentName)) {
- throw new RuntimeException(segmentName + " is not a Low level consumer
segment name");
- }
-
- _segmentName = segmentName;
String[] parts = StringUtils.splitByWholeSeparator(segmentName, SEPARATOR);
+ Preconditions.checkArgument(parts.length == 4, "Invalid LLC segment name:
%s", segmentName);
_tableName = parts[0];
_partitionGroupId = Integer.parseInt(parts[1]);
_sequenceNumber = Integer.parseInt(parts[2]);
_creationTime = parts[3];
+ _segmentName = segmentName;
}
public LLCSegmentName(String tableName, int partitionGroupId, int
sequenceNumber, long msSinceEpoch) {
@@ -59,6 +59,28 @@ public class LLCSegmentName extends SegmentName implements
Comparable {
_segmentName = tableName + SEPARATOR + partitionGroupId + SEPARATOR +
sequenceNumber + SEPARATOR + _creationTime;
}
+ private LLCSegmentName(String tableName, int partitionGroupId, int
sequenceNumber, String creationTime,
+ String segmentName) {
+ _tableName = tableName;
+ _partitionGroupId = partitionGroupId;
+ _sequenceNumber = sequenceNumber;
+ _creationTime = creationTime;
+ _segmentName = segmentName;
+ }
+
+ /**
+ * Returns the {@link LLCSegmentName} for the given segment name, or {@code
null} if the given segment name does not
+ * represent an LLC segment.
+ */
+ @Nullable
+ public static LLCSegmentName of(String segmentName) {
+ String[] parts = StringUtils.splitByWholeSeparator(segmentName, SEPARATOR);
+ if (parts.length != 4) {
+ return null;
+ }
+ return new LLCSegmentName(parts[0], Integer.parseInt(parts[1]),
Integer.parseInt(parts[2]), parts[3], segmentName);
+ }
+
/**
* Returns the sequence number of the given segment name.
*/
@@ -145,32 +167,13 @@ public class LLCSegmentName extends SegmentName
implements Comparable {
if (o == null || getClass() != o.getClass()) {
return false;
}
-
- LLCSegmentName segName = (LLCSegmentName) o;
-
- if (_partitionGroupId != segName._partitionGroupId) {
- return false;
- }
- if (_sequenceNumber != segName._sequenceNumber) {
- return false;
- }
- if (_tableName != null ? !_tableName.equals(segName._tableName) :
segName._tableName != null) {
- return false;
- }
- if (_creationTime != null ? !_creationTime.equals(segName._creationTime) :
segName._creationTime != null) {
- return false;
- }
- return !(_segmentName != null ? !_segmentName.equals(segName._segmentName)
: segName._segmentName != null);
+ LLCSegmentName that = (LLCSegmentName) o;
+ return _segmentName.equals(that._segmentName);
}
@Override
public int hashCode() {
- int result = _tableName != null ? _tableName.hashCode() : 0;
- result = 31 * result + _partitionGroupId;
- result = 31 * result + _sequenceNumber;
- result = 31 * result + (_creationTime != null ? _creationTime.hashCode() :
0);
- result = 31 * result + (_segmentName != null ? _segmentName.hashCode() :
0);
- return result;
+ return Objects.hash(_segmentName);
}
@Override
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/LLCUtils.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/LLCUtils.java
deleted file mode 100644
index ee71e29f1e..0000000000
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/LLCUtils.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.common.utils;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeSet;
-
-
-public class LLCUtils {
- private LLCUtils() {
- }
-
- /**
- * Compute the table of a sorted list of segments grouped by Kafka partition.
- *
- * @param segmentSet is the set of segment names that need to be sorted.
- * @return map of Stream partition to sorted set of segment names
- */
- public static Map<String, SortedSet<SegmentName>>
sortSegmentsByStreamPartition(Set<String> segmentSet) {
- Map<String, SortedSet<SegmentName>> sortedSegmentsByStreamPartition = new
HashMap<>();
- for (String segment : segmentSet) {
- // Ignore segments that are not low level consumer segments
- if (!SegmentName.isLowLevelConsumerSegmentName(segment)) {
- continue;
- }
-
- final LLCSegmentName segmentName = new LLCSegmentName(segment);
- String streamPartitionId = segmentName.getPartitionRange();
- SortedSet<SegmentName> segmentsForPartition =
- sortedSegmentsByStreamPartition.computeIfAbsent(streamPartitionId, k
-> new TreeSet<>());
- segmentsForPartition.add(segmentName);
- }
- return sortedSegmentsByStreamPartition;
- }
-}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentUtils.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentUtils.java
index 04be4a1c58..ad0f49cc2b 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentUtils.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentUtils.java
@@ -37,18 +37,17 @@ public class SegmentUtils {
// path.
@Nullable
public static Integer getRealtimeSegmentPartitionId(String segmentName,
String realtimeTableName,
- HelixManager helixManager,
- String partitionColumn) {
- // A fast path if the segmentName is a LLC segment name and we can get the
partition id from the name directly.
- if (LLCSegmentName.isLowLevelConsumerSegmentName(segmentName)) {
- return new LLCSegmentName(segmentName).getPartitionGroupId();
+ HelixManager helixManager, String partitionColumn) {
+ // A fast path if the segmentName is an LLC segment name: get the
partition id from the name directly
+ LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentName);
+ if (llcSegmentName != null) {
+ return llcSegmentName.getPartitionGroupId();
}
// Otherwise, retrieve the partition id from the segment zk metadata.
SegmentZKMetadata segmentZKMetadata =
ZKMetadataProvider.getSegmentZKMetadata(helixManager.getHelixPropertyStore(),
realtimeTableName, segmentName);
- Preconditions
- .checkState(segmentZKMetadata != null, "Failed to find segment ZK
metadata for segment: %s of table: %s",
- segmentName, realtimeTableName);
+ Preconditions.checkState(segmentZKMetadata != null,
+ "Failed to find segment ZK metadata for segment: %s of table: %s",
segmentName, realtimeTableName);
SegmentPartitionMetadata segmentPartitionMetadata =
segmentZKMetadata.getPartitionMetadata();
if (segmentPartitionMetadata != null) {
ColumnPartitionMetadata columnPartitionMetadata =
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
index 7a12d9e5eb..b75d112ce2 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
@@ -242,9 +242,16 @@ public class ZKOperator {
long segmentSizeInBytes, boolean enableParallelPushProtection,
HttpHeaders headers)
throws Exception {
String segmentName = segmentMetadata.getName();
- SegmentZKMetadata newSegmentZKMetadata =
- ZKMetadataUtils.createSegmentZKMetadata(tableNameWithType,
segmentMetadata, downloadUrl, crypterName,
- segmentSizeInBytes);
+ SegmentZKMetadata newSegmentZKMetadata;
+ try {
+ newSegmentZKMetadata =
+ ZKMetadataUtils.createSegmentZKMetadata(tableNameWithType,
segmentMetadata, downloadUrl, crypterName,
+ segmentSizeInBytes);
+ } catch (IllegalArgumentException e) {
+ throw new ControllerApplicationException(LOGGER,
+ String.format("Got invalid segment metadata when adding segment: %s
for table: %s, reason: %s", segmentName,
+ tableNameWithType, e.getMessage()), Response.Status.BAD_REQUEST);
+ }
// Lock if enableParallelPushProtection is true.
if (enableParallelPushProtection) {
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 1a7428687b..75a3b84f93 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -156,10 +156,10 @@ public class PinotLLCRealtimeSegmentManager {
private final Lock[] _idealStateUpdateLocks;
private final FlushThresholdUpdateManager _flushThresholdUpdateManager;
private final boolean _isDeepStoreLLCSegmentUploadRetryEnabled;
+ private final FileUploadDownloadClient _fileUploadDownloadClient;
+ private final AtomicInteger _numCompletingSegments = new AtomicInteger(0);
private volatile boolean _isStopping = false;
- private AtomicInteger _numCompletingSegments = new AtomicInteger(0);
- private FileUploadDownloadClient _fileUploadDownloadClient;
public PinotLLCRealtimeSegmentManager(PinotHelixResourceManager
helixResourceManager, ControllerConf controllerConf,
ControllerMetrics controllerMetrics) {
@@ -179,9 +179,7 @@ public class PinotLLCRealtimeSegmentManager {
}
_flushThresholdUpdateManager = new FlushThresholdUpdateManager();
_isDeepStoreLLCSegmentUploadRetryEnabled =
controllerConf.isDeepStoreRetryUploadLLCSegmentEnabled();
- if (_isDeepStoreLLCSegmentUploadRetryEnabled) {
- _fileUploadDownloadClient = initFileUploadDownloadClient();
- }
+ _fileUploadDownloadClient = _isDeepStoreLLCSegmentUploadRetryEnabled ?
initFileUploadDownloadClient() : null;
}
public boolean isDeepStoreLLCSegmentUploadRetryEnabled() {
@@ -210,10 +208,10 @@ public class PinotLLCRealtimeSegmentManager {
for (String segment : idealState.getRecord().getMapFields().keySet()) {
// With Pinot upsert table allowing uploads of segments, the segment
name of an upsert table segment may not
// conform to LLCSegment format. We can skip such segments because they
are NOT the consuming segments.
- if (!LLCSegmentName.isLowLevelConsumerSegmentName(segment)) {
+ LLCSegmentName llcSegmentName = LLCSegmentName.of(segment);
+ if (llcSegmentName == null) {
continue;
}
- LLCSegmentName llcSegmentName = new LLCSegmentName(segment);
int partitionGroupId = llcSegmentName.getPartitionGroupId();
partitionGroupIdToLatestSegment.compute(partitionGroupId, (k,
latestSegment) -> {
if (latestSegment == null) {
@@ -249,8 +247,8 @@ public class PinotLLCRealtimeSegmentManager {
public void stop() {
_isStopping = true;
- LOGGER
- .info("Awaiting segment metadata commits: maxWaitTimeMillis = {}",
MAX_LLC_SEGMENT_METADATA_COMMIT_TIME_MILLIS);
+ LOGGER.info("Awaiting segment metadata commits: maxWaitTimeMillis = {}",
+ MAX_LLC_SEGMENT_METADATA_COMMIT_TIME_MILLIS);
long millisToWait = MAX_LLC_SEGMENT_METADATA_COMMIT_TIME_MILLIS;
// Busy-wait for all segments that are committing metadata to complete
their operation.
@@ -363,11 +361,11 @@ public class PinotLLCRealtimeSegmentManager {
@VisibleForTesting
InstancePartitions getConsumingInstancePartitions(TableConfig tableConfig) {
try {
- return InstancePartitionsUtils
- .fetchOrComputeInstancePartitions(_helixManager, tableConfig,
InstancePartitionsType.CONSUMING);
+ return
InstancePartitionsUtils.fetchOrComputeInstancePartitions(_helixManager,
tableConfig,
+ InstancePartitionsType.CONSUMING);
} catch (Exception e) {
- _controllerMetrics
- .addMeteredTableValue(tableConfig.getTableName(),
ControllerMeter.LLC_ZOOKEEPER_FETCH_FAILURES, 1L);
+ _controllerMetrics.addMeteredTableValue(tableConfig.getTableName(),
ControllerMeter.LLC_ZOOKEEPER_FETCH_FAILURES,
+ 1L);
throw e;
}
}
@@ -399,12 +397,11 @@ public class PinotLLCRealtimeSegmentManager {
@VisibleForTesting
SegmentZKMetadata getSegmentZKMetadata(String realtimeTableName, String
segmentName, @Nullable Stat stat) {
try {
- ZNRecord znRecord = _propertyStore
-
.get(ZKMetadataProvider.constructPropertyStorePathForSegment(realtimeTableName,
segmentName), stat,
- AccessOption.PERSISTENT);
- Preconditions
- .checkState(znRecord != null, "Failed to find segment ZK metadata
for segment: %s of table: %s", segmentName,
- realtimeTableName);
+ ZNRecord znRecord =
+
_propertyStore.get(ZKMetadataProvider.constructPropertyStorePathForSegment(realtimeTableName,
segmentName),
+ stat, AccessOption.PERSISTENT);
+ Preconditions.checkState(znRecord != null, "Failed to find segment ZK
metadata for segment: %s of table: %s",
+ segmentName, realtimeTableName);
return new SegmentZKMetadata(znRecord);
} catch (Exception e) {
_controllerMetrics.addMeteredTableValue(realtimeTableName,
ControllerMeter.LLC_ZOOKEEPER_FETCH_FAILURES, 1L);
@@ -417,9 +414,9 @@ public class PinotLLCRealtimeSegmentManager {
String segmentName = segmentZKMetadata.getSegmentName();
LOGGER.info("Persisting segment ZK metadata for segment: {}", segmentName);
try {
- Preconditions.checkState(_propertyStore
-
.set(ZKMetadataProvider.constructPropertyStorePathForSegment(realtimeTableName,
segmentName),
- segmentZKMetadata.toZNRecord(), expectedVersion,
AccessOption.PERSISTENT),
+ Preconditions.checkState(
+
_propertyStore.set(ZKMetadataProvider.constructPropertyStorePathForSegment(realtimeTableName,
segmentName),
+ segmentZKMetadata.toZNRecord(), expectedVersion,
AccessOption.PERSISTENT),
"Failed to persist segment ZK metadata for segment: %s of table:
%s", segmentName, realtimeTableName);
} catch (Exception e) {
_controllerMetrics.addMeteredTableValue(realtimeTableName,
ControllerMeter.LLC_ZOOKEEPER_UPDATE_FAILURES, 1L);
@@ -522,9 +519,9 @@ public class PinotLLCRealtimeSegmentManager {
TableConfig tableConfig = getTableConfig(realtimeTableName);
InstancePartitions instancePartitions =
getConsumingInstancePartitions(tableConfig);
IdealState idealState = getIdealState(realtimeTableName);
- Preconditions
-
.checkState(idealState.getInstanceStateMap(committingSegmentName).containsValue(SegmentStateModel.CONSUMING),
- "Failed to find instance in CONSUMING state in IdealState for
segment: %s", committingSegmentName);
+ Preconditions.checkState(
+
idealState.getInstanceStateMap(committingSegmentName).containsValue(SegmentStateModel.CONSUMING),
+ "Failed to find instance in CONSUMING state in IdealState for segment:
%s", committingSegmentName);
int numReplicas = getNumReplicas(tableConfig, instancePartitions);
/*
@@ -668,9 +665,9 @@ public class PinotLLCRealtimeSegmentManager {
String segmentName = newLLCSegmentName.getSegmentName();
String startOffset = committingSegmentDescriptor.getNextOffset();
- LOGGER
- .info("Creating segment ZK metadata for new CONSUMING segment: {} with
start offset: {} and creation time: {}",
- segmentName, startOffset, creationTimeMs);
+ LOGGER.info(
+ "Creating segment ZK metadata for new CONSUMING segment: {} with start
offset: {} and creation time: {}",
+ segmentName, startOffset, creationTimeMs);
SegmentZKMetadata newSegmentZKMetadata = new
SegmentZKMetadata(segmentName);
newSegmentZKMetadata.setCreationTime(creationTimeMs);
@@ -761,8 +758,8 @@ public class PinotLLCRealtimeSegmentManager {
@VisibleForTesting
List<PartitionGroupMetadata> getNewPartitionGroupMetadataList(StreamConfig
streamConfig,
List<PartitionGroupConsumptionStatus>
currentPartitionGroupConsumptionStatusList) {
- return PinotTableIdealStateBuilder
- .getPartitionGroupMetadataList(streamConfig,
currentPartitionGroupConsumptionStatusList);
+ return
PinotTableIdealStateBuilder.getPartitionGroupMetadataList(streamConfig,
+ currentPartitionGroupConsumptionStatusList);
}
/**
@@ -869,8 +866,8 @@ public class PinotLLCRealtimeSegmentManager {
* TODO: We need to find a place to detect and update a gauge for
nonConsumingPartitionsCount for a table, and
* reset it to 0 at the end of validateLLC
*/
- public void ensureAllPartitionsConsuming(TableConfig tableConfig,
- PartitionLevelStreamConfig streamConfig, boolean
recreateDeletedConsumingSegment) {
+ public void ensureAllPartitionsConsuming(TableConfig tableConfig,
PartitionLevelStreamConfig streamConfig,
+ boolean recreateDeletedConsumingSegment) {
Preconditions.checkState(!_isStopping, "Segment manager is stopping");
String realtimeTableName = tableConfig.getTableName();
@@ -931,8 +928,8 @@ public class PinotLLCRealtimeSegmentManager {
if (committingSegmentName != null) {
// Change committing segment state to ONLINE
Set<String> instances =
instanceStatesMap.get(committingSegmentName).keySet();
- instanceStatesMap
- .put(committingSegmentName,
SegmentAssignmentUtils.getInstanceStateMap(instances,
SegmentStateModel.ONLINE));
+ instanceStatesMap.put(committingSegmentName,
+ SegmentAssignmentUtils.getInstanceStateMap(instances,
SegmentStateModel.ONLINE));
LOGGER.info("Updating segment: {} to ONLINE state",
committingSegmentName);
}
@@ -948,13 +945,13 @@ public class PinotLLCRealtimeSegmentManager {
int partitionId = newLLCSegmentName.getPartitionGroupId();
int seqNum = newLLCSegmentName.getSequenceNumber();
for (String segmentNameStr : instanceStatesMap.keySet()) {
- if (!LLCSegmentName.isLowLevelConsumerSegmentName(segmentNameStr)) {
+ LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentNameStr);
+ if (llcSegmentName == null) {
// skip the segment name if the name is not in low-level consumer
format
// such segment name can appear for uploaded segment
LOGGER.debug("Skip segment name {} not in low-level consumer
format", segmentNameStr);
continue;
}
- LLCSegmentName llcSegmentName = new LLCSegmentName(segmentNameStr);
if (llcSegmentName.getPartitionGroupId() == partitionId &&
llcSegmentName.getSequenceNumber() == seqNum) {
String errorMsg =
String.format("Segment %s is a duplicate of existing segment
%s", newSegmentName, segmentNameStr);
@@ -1140,18 +1137,18 @@ public class PinotLLCRealtimeSegmentManager {
segmentAssignment, instancePartitionsMap, startOffset);
} else {
if (newPartitionGroupSet.contains(partitionGroupId)) {
- if (recreateDeletedConsumingSegment &&
Status.DONE.equals(latestSegmentZKMetadata.getStatus())
+ if (recreateDeletedConsumingSegment &&
latestSegmentZKMetadata.getStatus().isCompleted()
&& isAllInstancesInState(instanceStateMap,
SegmentStateModel.ONLINE)) {
// If we get here, that means in IdealState, the latest
segment has all replicas ONLINE.
// Create a new IN_PROGRESS segment in PROPERTYSTORE,
// add it as CONSUMING segment to IDEALSTATE.
StreamPartitionMsgOffset startOffset =
offsetFactory.create(latestSegmentZKMetadata.getEndOffset());
createNewConsumingSegment(tableConfig, streamConfig,
latestSegmentZKMetadata, currentTimeMs,
- partitionGroupId, newPartitionGroupMetadataList,
instancePartitions,
- instanceStatesMap, segmentAssignment,
instancePartitionsMap, startOffset);
+ partitionGroupId, newPartitionGroupMetadataList,
instancePartitions, instanceStatesMap,
+ segmentAssignment, instancePartitionsMap, startOffset);
} else {
- LOGGER.error("Got unexpected instance state map: {} for
segment: {}",
- instanceStateMap, latestSegmentName);
+ LOGGER.error("Got unexpected instance state map: {} for
segment: {}", instanceStateMap,
+ latestSegmentName);
}
}
// else, the partition group has reached end of life. This is an
acceptable state
@@ -1173,17 +1170,16 @@ public class PinotLLCRealtimeSegmentManager {
// Find the previous CONSUMING segment
String previousConsumingSegment = null;
for (Map.Entry<String, Map<String, String>> segmentEntry :
instanceStatesMap.entrySet()) {
- LLCSegmentName llcSegmentName = new
LLCSegmentName(segmentEntry.getKey());
- if (llcSegmentName.getPartitionGroupId() == partitionGroupId &&
segmentEntry.getValue()
- .containsValue(SegmentStateModel.CONSUMING)) {
- previousConsumingSegment = llcSegmentName.getSegmentName();
+ if
(segmentEntry.getValue().containsValue(SegmentStateModel.CONSUMING)
+ && new
LLCSegmentName(segmentEntry.getKey()).getPartitionGroupId() ==
partitionGroupId) {
+ previousConsumingSegment = segmentEntry.getKey();
break;
}
}
if (previousConsumingSegment == null) {
- LOGGER
- .error("Failed to find previous CONSUMING segment for
partition: {} of table: {}, potential data loss",
- partitionGroupId, realtimeTableName);
+ LOGGER.error(
+ "Failed to find previous CONSUMING segment for partition: {}
of table: {}, potential data loss",
+ partitionGroupId, realtimeTableName);
_controllerMetrics.addMeteredTableValue(realtimeTableName,
ControllerMeter.LLC_STREAM_DATA_LOSS, 1L);
}
updateInstanceStatesForNewConsumingSegment(instanceStatesMap,
previousConsumingSegment, latestSegmentName,
@@ -1232,9 +1228,8 @@ public class PinotLLCRealtimeSegmentManager {
CommittingSegmentDescriptor committingSegmentDescriptor =
new
CommittingSegmentDescriptor(latestSegmentZKMetadata.getSegmentName(),
startOffset.toString(), 0);
- createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName,
currentTimeMs,
- committingSegmentDescriptor, latestSegmentZKMetadata,
instancePartitions, numPartitions,
- numReplicas, newPartitionGroupMetadataList);
+ createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName,
currentTimeMs, committingSegmentDescriptor,
+ latestSegmentZKMetadata, instancePartitions, numPartitions,
numReplicas, newPartitionGroupMetadataList);
String newSegmentName = newLLCSegmentName.getSegmentName();
updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null,
newSegmentName, segmentAssignment,
instancePartitionsMap);
@@ -1338,13 +1333,9 @@ public class PinotLLCRealtimeSegmentManager {
* TODO: Add an on-demand way to upload LLC segment to deep store for a
specific table.
*/
public void uploadToDeepStoreIfMissing(TableConfig tableConfig,
List<SegmentZKMetadata> segmentsZKMetadata) {
- String realtimeTableName = tableConfig.getTableName();
+ Preconditions.checkState(!_isStopping, "Segment manager is stopping");
- if (_isStopping) {
- LOGGER.info("Skipped fixing deep store copy of LLC segments for table
{}, because segment manager is stopping.",
- realtimeTableName);
- return;
- }
+ String realtimeTableName = tableConfig.getTableName();
// Use this retention value to avoid the data racing between segment
upload and retention management.
RetentionStrategy retentionStrategy = null;
@@ -1367,11 +1358,8 @@ public class PinotLLCRealtimeSegmentManager {
// servers. We may need to rate control the upload request if it is
changed to be in parallel.
String segmentName = segmentZKMetadata.getSegmentName();
try {
- if (!LLCSegmentName.isLowLevelConsumerSegmentName(segmentName)) {
- continue;
- }
- // Only fix the committed / externally uploaded llc segment without
deep store copy
- if (segmentZKMetadata.getStatus() == Status.IN_PROGRESS
+ // Only fix the committed (DONE) LLC segment without deep store copy
(empty download URL)
+ if (!SegmentName.isLowLevelConsumerSegmentName(segmentName) ||
segmentZKMetadata.getStatus() != Status.DONE
||
!CommonConstants.Segment.METADATA_URI_FOR_PEER_DOWNLOAD.equals(segmentZKMetadata.getDownloadUrl()))
{
continue;
}
@@ -1386,8 +1374,9 @@ public class PinotLLCRealtimeSegmentManager {
List<URI> peerSegmentURIs =
PeerServerSegmentFinder.getPeerServerURIs(segmentName,
CommonConstants.HTTP_PROTOCOL, _helixManager);
if (peerSegmentURIs.isEmpty()) {
- throw new IllegalStateException(String
- .format("Failed to upload segment %s to deep store because no
online replica is found", segmentName));
+ throw new IllegalStateException(
+ String.format("Failed to upload segment %s to deep store because
no online replica is found",
+ segmentName));
}
// Randomly ask one server to upload
@@ -1404,8 +1393,8 @@ public class PinotLLCRealtimeSegmentManager {
LOGGER.info("Successfully uploaded LLC segment {} to deep store with
download url: {}", segmentName,
segmentDownloadUrl);
} catch (Exception e) {
- _controllerMetrics
- .addMeteredTableValue(realtimeTableName,
ControllerMeter.LLC_SEGMENTS_DEEP_STORE_UPLOAD_RETRY_ERROR, 1L);
+ _controllerMetrics.addMeteredTableValue(realtimeTableName,
+ ControllerMeter.LLC_SEGMENTS_DEEP_STORE_UPLOAD_RETRY_ERROR, 1L);
LOGGER.error("Failed to upload segment {} to deep store", segmentName,
e);
}
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
index 597eb1255b..7f5f7185a9 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionManager.java
@@ -118,11 +118,6 @@ public class SegmentCompletionManager {
return System.currentTimeMillis();
}
- public StreamPartitionMsgOffsetFactory
getStreamPartitionMsgOffsetFactory(String segmentName) {
- final LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
- return getStreamPartitionMsgOffsetFactory(llcSegmentName);
- }
-
protected StreamPartitionMsgOffsetFactory
getStreamPartitionMsgOffsetFactory(LLCSegmentName llcSegmentName) {
final String rawTableName = llcSegmentName.getTableName();
TableConfig tableConfig =
_segmentManager.getTableConfig(TableNameBuilder.REALTIME.tableNameWithType(rawTableName));
@@ -149,7 +144,7 @@ public class SegmentCompletionManager {
final String realtimeTableName =
TableNameBuilder.REALTIME.tableNameWithType(segmentName.getTableName());
SegmentZKMetadata segmentMetadata =
_segmentManager.getSegmentZKMetadata(realtimeTableName,
segmentName.getSegmentName(), null);
- if
(segmentMetadata.getStatus().equals(CommonConstants.Segment.Realtime.Status.DONE))
{
+ if (segmentMetadata.getStatus() ==
CommonConstants.Segment.Realtime.Status.DONE) {
// Best to go through the state machine for this case as well, so
that all code regarding state handling is
// in one place
// Also good for synchronization, because it is possible that
multiple threads take this path, and we don't
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ZKMetadataUtils.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ZKMetadataUtils.java
index 11e67b1663..0afcb3aa59 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ZKMetadataUtils.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ZKMetadataUtils.java
@@ -18,11 +18,13 @@
*/
package org.apache.pinot.controller.helix.core.util;
+import com.google.common.base.Preconditions;
import java.util.HashMap;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
+import org.apache.pinot.common.utils.SegmentName;
import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.segment.spi.partition.PartitionFunction;
import org.apache.pinot.segment.spi.partition.metadata.ColumnPartitionMetadata;
@@ -41,8 +43,7 @@ public class ZKMetadataUtils {
String downloadUrl, @Nullable String crypterName, long
segmentSizeInBytes) {
SegmentZKMetadata segmentZKMetadata = new
SegmentZKMetadata(segmentMetadata.getName());
updateSegmentZKMetadata(tableNameWithType, segmentZKMetadata,
segmentMetadata, downloadUrl, crypterName,
- segmentSizeInBytes);
- segmentZKMetadata.setPushTime(System.currentTimeMillis());
+ segmentSizeInBytes, true);
return segmentZKMetadata;
}
@@ -52,12 +53,18 @@ public class ZKMetadataUtils {
public static void refreshSegmentZKMetadata(String tableNameWithType,
SegmentZKMetadata segmentZKMetadata,
SegmentMetadata segmentMetadata, String downloadUrl, @Nullable String
crypterName, long segmentSizeInBytes) {
updateSegmentZKMetadata(tableNameWithType, segmentZKMetadata,
segmentMetadata, downloadUrl, crypterName,
- segmentSizeInBytes);
- segmentZKMetadata.setRefreshTime(System.currentTimeMillis());
+ segmentSizeInBytes, false);
}
private static void updateSegmentZKMetadata(String tableNameWithType,
SegmentZKMetadata segmentZKMetadata,
- SegmentMetadata segmentMetadata, String downloadUrl, @Nullable String
crypterName, long segmentSizeInBytes) {
+ SegmentMetadata segmentMetadata, String downloadUrl, @Nullable String
crypterName, long segmentSizeInBytes,
+ boolean newSegment) {
+ if (newSegment) {
+ segmentZKMetadata.setPushTime(System.currentTimeMillis());
+ } else {
+ segmentZKMetadata.setRefreshTime(System.currentTimeMillis());
+ }
+
if (segmentMetadata.getTimeInterval() != null) {
segmentZKMetadata.setStartTime(segmentMetadata.getStartTime());
segmentZKMetadata.setEndTime(segmentMetadata.getEndTime());
@@ -67,11 +74,8 @@ public class ZKMetadataUtils {
segmentZKMetadata.setEndTime(-1);
segmentZKMetadata.setTimeUnit(null);
}
- if (segmentMetadata.getVersion() != null) {
- segmentZKMetadata.setIndexVersion(segmentMetadata.getVersion().name());
- } else {
- segmentZKMetadata.setIndexVersion(null);
- }
+ segmentZKMetadata.setIndexVersion(
+ segmentMetadata.getVersion() != null ?
segmentMetadata.getVersion().name() : null);
segmentZKMetadata.setTotalDocs(segmentMetadata.getTotalDocs());
segmentZKMetadata.setSizeInBytes(segmentSizeInBytes);
segmentZKMetadata.setCrc(Long.parseLong(segmentMetadata.getCrc()));
@@ -90,11 +94,8 @@ public class ZKMetadataUtils {
columnPartitionMap.put(column, columnPartitionMetadata);
}
});
- if (!columnPartitionMap.isEmpty()) {
- segmentZKMetadata.setPartitionMetadata(new
SegmentPartitionMetadata(columnPartitionMap));
- } else {
- segmentZKMetadata.setPartitionMetadata(null);
- }
+ segmentZKMetadata.setPartitionMetadata(
+ !columnPartitionMap.isEmpty() ? new
SegmentPartitionMetadata(columnPartitionMap) : null);
// Update custom metadata
// NOTE: Do not remove existing keys because they can be set by the HTTP
header from the segment upload request
@@ -110,6 +111,12 @@ public class ZKMetadataUtils {
if (TableNameBuilder.isRealtimeTableResource(tableNameWithType)) {
segmentZKMetadata.setStatus(CommonConstants.Segment.Realtime.Status.UPLOADED);
+ // For new segment, start/end offset must exist if the segment name
follows LLC segment name convention
+ if (newSegment &&
SegmentName.isLowLevelConsumerSegmentName(segmentMetadata.getName())) {
+ Preconditions.checkArgument(segmentMetadata.getStartOffset() != null
&& segmentMetadata.getEndOffset() != null,
+ "New uploaded LLC segment must have start/end offset in the
segment metadata");
+ }
+
// NOTE:
// - If start/end offset is available in the uploaded segment, update
them in the segment ZK metadata
// - If not, keep the existing start/end offset in the segment ZK
metadata unchanged
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
index 5831c0617a..b772fcabd1 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
@@ -34,9 +34,7 @@ import
org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import
org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
import
org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.stream.PartitionLevelStreamConfig;
-import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
@@ -90,41 +88,39 @@ public class RealtimeSegmentValidationManager extends
ControllerPeriodicTask<Rea
@Override
protected void processTable(String tableNameWithType, Context context) {
- TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
- if (tableType == TableType.REALTIME) {
+ if (!TableNameBuilder.isRealtimeTableResource(tableNameWithType)) {
+ return;
+ }
- TableConfig tableConfig =
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
- if (tableConfig == null) {
- LOGGER.warn("Failed to find table config for table: {}, skipping
validation", tableNameWithType);
- return;
- }
+ TableConfig tableConfig =
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
+ if (tableConfig == null) {
+ LOGGER.warn("Failed to find table config for table: {}, skipping
validation", tableNameWithType);
+ return;
+ }
+ PartitionLevelStreamConfig streamConfig = new
PartitionLevelStreamConfig(tableConfig.getTableName(),
+ IngestionConfigUtils.getStreamConfigMap(tableConfig));
- if (context._runSegmentLevelValidation) {
- runSegmentLevelValidation(tableConfig);
- }
+ if (context._runSegmentLevelValidation) {
+ runSegmentLevelValidation(tableConfig, streamConfig);
+ }
- PartitionLevelStreamConfig streamConfig = new
PartitionLevelStreamConfig(tableConfig.getTableName(),
- IngestionConfigUtils.getStreamConfigMap(tableConfig));
- if (streamConfig.hasLowLevelConsumerType()) {
- _llcRealtimeSegmentManager.ensureAllPartitionsConsuming(tableConfig,
- streamConfig, context._recreateDeletedConsumingSegment);
- }
+ if (streamConfig.hasLowLevelConsumerType()) {
+ _llcRealtimeSegmentManager.ensureAllPartitionsConsuming(tableConfig,
streamConfig,
+ context._recreateDeletedConsumingSegment);
}
}
- private void runSegmentLevelValidation(TableConfig tableConfig) {
+ private void runSegmentLevelValidation(TableConfig tableConfig,
PartitionLevelStreamConfig streamConfig) {
String realtimeTableName = tableConfig.getTableName();
List<SegmentZKMetadata> segmentsZKMetadata =
_pinotHelixResourceManager.getSegmentsZKMetadata(realtimeTableName);
- boolean countHLCSegments = true; // false if this table has ONLY LLC
segments (i.e. fully migrated)
- StreamConfig streamConfig =
- new StreamConfig(realtimeTableName,
IngestionConfigUtils.getStreamConfigMap(tableConfig));
- if (streamConfig.hasLowLevelConsumerType() &&
!streamConfig.hasHighLevelConsumerType()) {
- countHLCSegments = false;
- }
- // Update the gauge to contain the total document count in the segments
-
_validationMetrics.updateTotalDocumentCountGauge(tableConfig.getTableName(),
- computeRealtimeTotalDocumentInSegments(segmentsZKMetadata,
countHLCSegments));
+ // Update the total document count gauge
+ // Count HLC segments if high level consumer is configured
+ boolean countHLCSegments = streamConfig.hasHighLevelConsumerType();
+ _validationMetrics.updateTotalDocumentCountGauge(realtimeTableName,
+ computeTotalDocumentCount(segmentsZKMetadata, countHLCSegments));
+
+ // Check missing segments and upload them to the deep store
if (streamConfig.hasLowLevelConsumerType()
&&
_llcRealtimeSegmentManager.isDeepStoreLLCSegmentUploadRetryEnabled()) {
_llcRealtimeSegmentManager.uploadToDeepStoreIfMissing(tableConfig,
segmentsZKMetadata);
@@ -134,42 +130,42 @@ public class RealtimeSegmentValidationManager extends
ControllerPeriodicTask<Rea
@Override
protected void nonLeaderCleanup(List<String> tableNamesWithType) {
for (String tableNameWithType : tableNamesWithType) {
- TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
- if (tableType == TableType.REALTIME) {
+ if (TableNameBuilder.isRealtimeTableResource(tableNameWithType)) {
_validationMetrics.cleanupTotalDocumentCountGauge(tableNameWithType);
}
}
}
@VisibleForTesting
- static long computeRealtimeTotalDocumentInSegments(List<SegmentZKMetadata>
segmentsZKMetadata,
- boolean countHLCSegments) {
+ static long computeTotalDocumentCount(List<SegmentZKMetadata>
segmentsZKMetadata, boolean countHLCSegments) {
long numTotalDocs = 0;
-
- String groupId = "";
- for (SegmentZKMetadata segmentZKMetadata : segmentsZKMetadata) {
- String segmentName = segmentZKMetadata.getSegmentName();
- if (SegmentName.isHighLevelConsumerSegmentName(segmentName)) {
- if (countHLCSegments) {
+ if (countHLCSegments) {
+ String groupId = null;
+ for (SegmentZKMetadata segmentZKMetadata : segmentsZKMetadata) {
+ String segmentName = segmentZKMetadata.getSegmentName();
+ if (SegmentName.isHighLevelConsumerSegmentName(segmentName)) {
HLCSegmentName hlcSegmentName = new HLCSegmentName(segmentName);
- String segmentGroupIdName = hlcSegmentName.getGroupId();
-
- if (groupId.isEmpty()) {
- groupId = segmentGroupIdName;
- }
- // Discard all segments with different groupids as they are replicas
- if (groupId.equals(segmentGroupIdName) &&
segmentZKMetadata.getTotalDocs() >= 0) {
- numTotalDocs += segmentZKMetadata.getTotalDocs();
+ String segmentGroupId = hlcSegmentName.getGroupId();
+ if (groupId == null) {
+ groupId = segmentGroupId;
+ numTotalDocs = segmentZKMetadata.getTotalDocs();
+ } else {
+ // Discard all segments with different group id as they are
replicas
+ if (groupId.equals(segmentGroupId)) {
+ numTotalDocs += segmentZKMetadata.getTotalDocs();
+ }
}
}
- } else {
- // Low level segments
- if (!countHLCSegments) {
+ }
+ } else {
+ for (SegmentZKMetadata segmentZKMetadata : segmentsZKMetadata) {
+ String segmentName = segmentZKMetadata.getSegmentName();
+ if (!SegmentName.isHighLevelConsumerSegmentName(segmentName)) {
+ // LLC segments or uploaded segments
numTotalDocs += segmentZKMetadata.getTotalDocs();
}
}
}
-
return numTotalDocs;
}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java
index 1f29a8731d..28c33eb256 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java
@@ -20,15 +20,24 @@ package org.apache.pinot.controller.api.upload;
import java.io.File;
import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.Response;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.URIUtils;
import org.apache.pinot.controller.ControllerConf;
+import
org.apache.pinot.controller.api.exception.ControllerApplicationException;
import org.apache.pinot.controller.helix.ControllerTest;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.CommonConstants.Segment.Realtime.Status;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.util.TestUtils;
@@ -38,32 +47,57 @@ import org.testng.annotations.Test;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertTrue;
-import static org.testng.Assert.fail;
+import static org.testng.Assert.*;
public class ZKOperatorTest {
- private static final String TABLE_NAME = "operatorTestTable";
- private static final String OFFLINE_TABLE_NAME =
TableNameBuilder.OFFLINE.tableNameWithType(TABLE_NAME);
+ private static final String RAW_TABLE_NAME = "testTable";
+ private static final String OFFLINE_TABLE_NAME =
TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME);
+ private static final String REALTIME_TABLE_NAME =
TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME);
+ private static final String TIME_COLUMN = "timeColumn";
private static final String SEGMENT_NAME = "testSegment";
+ // NOTE: The FakeStreamConsumerFactory will create 2 stream partitions. Use
partition 2 to avoid conflict.
+ private static final String LLC_SEGMENT_NAME =
+ new LLCSegmentName(RAW_TABLE_NAME, 2, 0,
System.currentTimeMillis()).getSegmentName();
private static final ControllerTest TEST_INSTANCE =
ControllerTest.getInstance();
+ private PinotHelixResourceManager _resourceManager;
+
@BeforeClass
public void setUp()
throws Exception {
TEST_INSTANCE.setupSharedStateAndValidate();
+ _resourceManager = TEST_INSTANCE.getHelixResourceManager();
+
+ Schema schema = new Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME)
+ .addDateTime(TIME_COLUMN, DataType.TIMESTAMP,
"1:MILLISECONDS:TIMESTAMP", "1:MILLISECONDS").build();
+ TableConfig offlineTableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build();
+ TableConfig realtimeTableConfig =
+ new
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setTimeColumnName(TIME_COLUMN)
+
.setStreamConfigs(getStreamConfigs()).setLLC(true).setNumReplicas(1).build();
+
+ _resourceManager.addSchema(schema, false);
+ _resourceManager.addTable(offlineTableConfig);
+ _resourceManager.addTable(realtimeTableConfig);
+ }
- TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).build();
- TEST_INSTANCE.getHelixResourceManager().addTable(tableConfig);
+ private Map<String, String> getStreamConfigs() {
+ Map<String, String> streamConfigs = new HashMap<>();
+ streamConfigs.put("streamType", "kafka");
+ streamConfigs.put("stream.kafka.topic.name", "kafkaTopic");
+ streamConfigs.put("stream.kafka.consumer.type", "simple");
+ streamConfigs.put("stream.kafka.decoder.class.name",
+ "org.apache.pinot.plugin.stream.kafka.KafkaAvroMessageDecoder");
+ streamConfigs.put("stream.kafka.consumer.factory.class.name",
+
"org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConsumerFactory");
+ return streamConfigs;
}
@Test
public void testCompleteSegmentOperations()
throws Exception {
- ZKOperator zkOperator = new
ZKOperator(TEST_INSTANCE.getHelixResourceManager(), mock(ControllerConf.class),
- mock(ControllerMetrics.class));
+ ZKOperator zkOperator = new ZKOperator(_resourceManager,
mock(ControllerConf.class), mock(ControllerMetrics.class));
+
SegmentMetadata segmentMetadata = mock(SegmentMetadata.class);
when(segmentMetadata.getName()).thenReturn(SEGMENT_NAME);
when(segmentMetadata.getCrc()).thenReturn("12345");
@@ -86,16 +120,14 @@ public class ZKOperatorTest {
// Wait for the segment Zk entry to be deleted.
TestUtils.waitForCondition(aVoid -> {
- SegmentZKMetadata segmentZKMetadata =
-
TEST_INSTANCE.getHelixResourceManager().getSegmentZKMetadata(OFFLINE_TABLE_NAME,
SEGMENT_NAME);
+ SegmentZKMetadata segmentZKMetadata =
_resourceManager.getSegmentZKMetadata(OFFLINE_TABLE_NAME, SEGMENT_NAME);
return segmentZKMetadata == null;
}, 30_000L, "Failed to delete segmentZkMetadata.");
zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata,
null, null, "downloadUrl", "crypter", 10,
true, true, httpHeaders);
- SegmentZKMetadata segmentZKMetadata =
-
TEST_INSTANCE.getHelixResourceManager().getSegmentZKMetadata(OFFLINE_TABLE_NAME,
SEGMENT_NAME);
+ SegmentZKMetadata segmentZKMetadata =
_resourceManager.getSegmentZKMetadata(OFFLINE_TABLE_NAME, SEGMENT_NAME);
assertNotNull(segmentZKMetadata);
assertEquals(segmentZKMetadata.getCrc(), 12345L);
assertEquals(segmentZKMetadata.getCreationTime(), 123L);
@@ -132,8 +164,8 @@ public class ZKOperatorTest {
when(segmentMetadata.getIndexCreationTime()).thenReturn(456L);
zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata,
null, null, "otherDownloadUrl",
"otherCrypter", 10, true, true, httpHeaders);
- segmentZKMetadata =
-
TEST_INSTANCE.getHelixResourceManager().getSegmentZKMetadata(OFFLINE_TABLE_NAME,
SEGMENT_NAME);
+
+ segmentZKMetadata =
_resourceManager.getSegmentZKMetadata(OFFLINE_TABLE_NAME, SEGMENT_NAME);
assertNotNull(segmentZKMetadata);
assertEquals(segmentZKMetadata.getCrc(), 12345L);
// Push time should not change
@@ -155,8 +187,8 @@ public class ZKOperatorTest {
Thread.sleep(10);
zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata,
null, null, "otherDownloadUrl",
"otherCrypter", 100, true, true, httpHeaders);
- segmentZKMetadata =
-
TEST_INSTANCE.getHelixResourceManager().getSegmentZKMetadata(OFFLINE_TABLE_NAME,
SEGMENT_NAME);
+
+ segmentZKMetadata =
_resourceManager.getSegmentZKMetadata(OFFLINE_TABLE_NAME, SEGMENT_NAME);
assertNotNull(segmentZKMetadata);
assertEquals(segmentZKMetadata.getCrc(), 23456L);
// Push time should not change
@@ -169,6 +201,74 @@ public class ZKOperatorTest {
assertEquals(segmentZKMetadata.getSizeInBytes(), 100);
}
+ @Test
+ public void testPushToRealtimeTable()
+ throws Exception {
+ ZKOperator zkOperator = new ZKOperator(_resourceManager,
mock(ControllerConf.class), mock(ControllerMetrics.class));
+
+ SegmentMetadata segmentMetadata = mock(SegmentMetadata.class);
+ when(segmentMetadata.getName()).thenReturn(SEGMENT_NAME);
+ when(segmentMetadata.getCrc()).thenReturn("12345");
+ zkOperator.completeSegmentOperations(REALTIME_TABLE_NAME, segmentMetadata,
null, null, "downloadUrl", null, 10,
+ true, true, mock(HttpHeaders.class));
+
+ SegmentZKMetadata segmentZKMetadata =
_resourceManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, SEGMENT_NAME);
+ assertNotNull(segmentZKMetadata);
+ assertEquals(segmentZKMetadata.getStatus(), Status.UPLOADED);
+ assertNull(segmentMetadata.getStartOffset());
+ assertNull(segmentMetadata.getEndOffset());
+
+ // Uploading a segment with LLC segment name but without start/end offset
should fail
+ when(segmentMetadata.getName()).thenReturn(LLC_SEGMENT_NAME);
+ when(segmentMetadata.getCrc()).thenReturn("23456");
+ try {
+ zkOperator.completeSegmentOperations(REALTIME_TABLE_NAME,
segmentMetadata, null, null, "downloadUrl", null, 10,
+ true, true, mock(HttpHeaders.class));
+ fail();
+ } catch (ControllerApplicationException e) {
+ assertEquals(e.getResponse().getStatus(),
Response.Status.BAD_REQUEST.getStatusCode());
+ }
+ assertNull(_resourceManager.getSegmentZKMetadata(REALTIME_TABLE_NAME,
LLC_SEGMENT_NAME));
+
+ // Uploading a segment with LLC segment name and start/end offset should
success
+ when(segmentMetadata.getStartOffset()).thenReturn("0");
+ when(segmentMetadata.getEndOffset()).thenReturn("1234");
+ zkOperator.completeSegmentOperations(REALTIME_TABLE_NAME, segmentMetadata,
null, null, "downloadUrl", null, 10,
+ true, true, mock(HttpHeaders.class));
+
+ segmentZKMetadata =
_resourceManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, LLC_SEGMENT_NAME);
+ assertNotNull(segmentZKMetadata);
+ assertEquals(segmentZKMetadata.getStatus(), Status.UPLOADED);
+ assertEquals(segmentZKMetadata.getStartOffset(), "0");
+ assertEquals(segmentZKMetadata.getEndOffset(), "1234");
+
+ // Refreshing a segment with LLC segment name but without start/end offset
should success
+ when(segmentMetadata.getCrc()).thenReturn("34567");
+ when(segmentMetadata.getStartOffset()).thenReturn(null);
+ when(segmentMetadata.getEndOffset()).thenReturn(null);
+ zkOperator.completeSegmentOperations(REALTIME_TABLE_NAME, segmentMetadata,
null, null, "downloadUrl", null, 10,
+ true, true, mock(HttpHeaders.class));
+
+ segmentZKMetadata =
_resourceManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, LLC_SEGMENT_NAME);
+ assertNotNull(segmentZKMetadata);
+ assertEquals(segmentZKMetadata.getStatus(), Status.UPLOADED);
+ assertEquals(segmentZKMetadata.getStartOffset(), "0");
+ assertEquals(segmentZKMetadata.getEndOffset(), "1234");
+
+ // Refreshing a segment with LLC segment name and start/end offset should
override the offsets
+ when(segmentMetadata.getCrc()).thenReturn("45678");
+ when(segmentMetadata.getStartOffset()).thenReturn("1234");
+ when(segmentMetadata.getEndOffset()).thenReturn("2345");
+ zkOperator.completeSegmentOperations(REALTIME_TABLE_NAME, segmentMetadata,
null, null, "downloadUrl", null, 10,
+ true, true, mock(HttpHeaders.class));
+
+ segmentZKMetadata =
_resourceManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, LLC_SEGMENT_NAME);
+ assertNotNull(segmentZKMetadata);
+ assertEquals(segmentZKMetadata.getStatus(), Status.UPLOADED);
+ assertEquals(segmentZKMetadata.getStartOffset(), "1234");
+ assertEquals(segmentZKMetadata.getEndOffset(), "2345");
+ }
+
@AfterClass
public void tearDown() {
TEST_INSTANCE.cleanup();
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java
index 711b552fee..204a2ff7dd 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java
@@ -221,8 +221,9 @@ public class PinotResourceManagerTest {
Map<Integer, Set<String>> segmentAssignment = new HashMap<>();
for (String segment : segments) {
Integer partitionId;
- if (LLCSegmentName.isLowLevelConsumerSegmentName(segment)) {
- partitionId = new LLCSegmentName(segment).getPartitionGroupId();
+ LLCSegmentName llcSegmentName = LLCSegmentName.of(segment);
+ if (llcSegmentName != null) {
+ partitionId = llcSegmentName.getPartitionGroupId();
} else {
partitionId = segment2PartitionId.get(segment);
}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
index 53efb6ad9c..2ada570ac8 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/SegmentCompletionTest.java
@@ -1349,10 +1349,6 @@ public class SegmentCompletionTest {
return new LongMsgOffsetFactory();
}
- public StreamPartitionMsgOffsetFactory
getStreamPartitionMsgOffsetFactory(String segmentName) {
- return new LongMsgOffsetFactory();
- }
-
@Override
protected long getCurrentTimeMs() {
return _seconds * 1000L;
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ValidationManagerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ValidationManagerTest.java
index c981ed0f14..c371dc9dc2 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ValidationManagerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/ValidationManagerTest.java
@@ -36,12 +36,13 @@ import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.Interval;
import org.mockito.Mockito;
-import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
/**
@@ -54,29 +55,27 @@ public class ValidationManagerTest {
private static final String TEST_SEGMENT_NAME = "testSegment";
private static final int EXPECTED_VERSION = -1;
- private TableConfig _offlineTableConfig;
-
@BeforeClass
public void setUp()
throws Exception {
TEST_INSTANCE.setupSharedStateAndValidate();
- _offlineTableConfig =
+ TableConfig offlineTableConfig =
new
TableConfigBuilder(TableType.OFFLINE).setTableName(TEST_TABLE_NAME).setNumReplicas(2).build();
- TEST_INSTANCE.getHelixResourceManager().addTable(_offlineTableConfig);
+ TEST_INSTANCE.getHelixResourceManager().addTable(offlineTableConfig);
}
@Test
public void testPushTimePersistence() {
SegmentMetadata segmentMetadata =
SegmentMetadataMockUtils.mockSegmentMetadata(TEST_TABLE_NAME,
TEST_SEGMENT_NAME);
- TEST_INSTANCE.getHelixResourceManager()
- .addNewSegment(OFFLINE_TEST_TABLE_NAME, segmentMetadata,
"downloadUrl");
+
TEST_INSTANCE.getHelixResourceManager().addNewSegment(OFFLINE_TEST_TABLE_NAME,
segmentMetadata, "downloadUrl");
SegmentZKMetadata segmentZKMetadata =
TEST_INSTANCE.getHelixResourceManager().getSegmentZKMetadata(OFFLINE_TEST_TABLE_NAME,
TEST_SEGMENT_NAME);
+ assertNotNull(segmentZKMetadata);
long pushTime = segmentZKMetadata.getPushTime();
// Check that the segment has been pushed in the last 30 seconds
- Assert.assertTrue(System.currentTimeMillis() - pushTime < 30_000);
+ assertTrue(System.currentTimeMillis() - pushTime < 30_000);
// Check that there is no refresh time
assertEquals(segmentZKMetadata.getRefreshTime(), Long.MIN_VALUE);
@@ -84,8 +83,8 @@ public class ValidationManagerTest {
// NOTE: In order to send the refresh message, the segment need to be in
the ExternalView
String offlineTableName =
TableNameBuilder.OFFLINE.tableNameWithType(TEST_TABLE_NAME);
TestUtils.waitForCondition(aVoid -> {
- ExternalView externalView = TEST_INSTANCE.getHelixAdmin()
- .getResourceExternalView(TEST_INSTANCE.getHelixClusterName(),
offlineTableName);
+ ExternalView externalView =
+
TEST_INSTANCE.getHelixAdmin().getResourceExternalView(TEST_INSTANCE.getHelixClusterName(),
offlineTableName);
return externalView != null &&
externalView.getPartitionSet().contains(TEST_SEGMENT_NAME);
}, 30_000L, "Failed to find the segment in the ExternalView");
Mockito.when(segmentMetadata.getCrc()).thenReturn(Long.toString(System.nanoTime()));
@@ -94,15 +93,15 @@ public class ValidationManagerTest {
segmentZKMetadata =
TEST_INSTANCE.getHelixResourceManager().getSegmentZKMetadata(OFFLINE_TEST_TABLE_NAME,
TEST_SEGMENT_NAME);
+ assertNotNull(segmentZKMetadata);
// Check that the segment still has the same push time
assertEquals(segmentZKMetadata.getPushTime(), pushTime);
// Check that the refresh time is in the last 30 seconds
- Assert.assertTrue(System.currentTimeMillis() -
segmentZKMetadata.getRefreshTime() < 30_000L);
+ assertTrue(System.currentTimeMillis() - segmentZKMetadata.getRefreshTime()
< 30_000L);
}
@Test
- public void testTotalDocumentCountRealTime()
- throws Exception {
+ public void testTotalDocumentCountRealTime() {
// Create a bunch of dummy segments
final String group1 = TEST_TABLE_NAME + "_REALTIME_1466446700000_34";
final String group2 = TEST_TABLE_NAME + "_REALTIME_1466446700000_17";
@@ -118,17 +117,17 @@ public class ValidationManagerTest {
// This should get ignored in the count as it belongs to a different group
id
segmentsZKMetadata.add(SegmentMetadataMockUtils.mockSegmentZKMetadata(segmentName4,
20));
-
assertEquals(RealtimeSegmentValidationManager.computeRealtimeTotalDocumentInSegments(segmentsZKMetadata,
true), 60);
+
assertEquals(RealtimeSegmentValidationManager.computeTotalDocumentCount(segmentsZKMetadata,
true), 60);
- // Now add some low level segment names
+ // Now add some LLC segments (both committed and uploaded)
String segmentName5 = new LLCSegmentName(TEST_TABLE_NAME, 1, 0,
1000).getSegmentName();
String segmentName6 = new LLCSegmentName(TEST_TABLE_NAME, 2, 27,
10000).getSegmentName();
segmentsZKMetadata.add(SegmentMetadataMockUtils.mockSegmentZKMetadata(segmentName5,
10));
segmentsZKMetadata.add(SegmentMetadataMockUtils.mockSegmentZKMetadata(segmentName6,
5));
+
segmentsZKMetadata.add(SegmentMetadataMockUtils.mockSegmentZKMetadata(TEST_SEGMENT_NAME,
15));
// Only the LLC segments should get counted.
-
assertEquals(RealtimeSegmentValidationManager.computeRealtimeTotalDocumentInSegments(segmentsZKMetadata,
false),
- 15);
+
assertEquals(RealtimeSegmentValidationManager.computeTotalDocumentCount(segmentsZKMetadata,
false), 30);
}
@Test
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 198023632c..d8db5624b9 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -290,61 +290,52 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
// of the index directory and loading segment from it
LoaderUtils.reloadFailureRecovery(segmentDir);
- boolean isLLCSegment =
SegmentName.isLowLevelConsumerSegmentName(segmentName);
- if (segmentDir.exists()) {
- // Segment already exists on disk
- if (segmentZKMetadata.getStatus() == Status.DONE ||
segmentZKMetadata.getStatus() == Status.UPLOADED) {
- // Metadata has been committed, load the local segment
+ boolean isHLCSegment =
SegmentName.isHighLevelConsumerSegmentName(segmentName);
+ if (segmentZKMetadata.getStatus().isCompleted()) {
+ if (segmentDir.exists()) {
+ // Local segment exists, try to load it
try {
addSegment(ImmutableSegmentLoader.load(segmentDir,
indexLoadingConfig, schema));
return;
} catch (Exception e) {
- if (isLLCSegment) {
- // For LLC and segments, delete the local copy and download a new
copy from the controller
- FileUtils.deleteQuietly(segmentDir);
+ if (!isHLCSegment) {
+ // For LLC and uploaded segments, delete the local copy and
download a new copy
_logger.error("Caught exception while loading segment: {},
downloading a new copy", segmentName, e);
+ FileUtils.deleteQuietly(segmentDir);
} else {
// For HLC segments, throw out the exception because there is no
way to recover (controller does not have a
// copy of the segment)
- throw e;
+ throw new RuntimeException("Failed to load local HLC segment: " +
segmentName, e);
}
}
} else {
- // Metadata has not been committed, delete the local segment
- FileUtils.deleteQuietly(segmentDir);
+ if (isHLCSegment) {
+ throw new RuntimeException("Failed to find local copy for committed
HLC segment: " + segmentName);
+ }
}
- } else if (segmentZKMetadata.getStatus() == Status.UPLOADED) {
- // The segment is uploaded to an upsert enabled realtime table. Download
the segment and load.
- String downloadUrl = segmentZKMetadata.getDownloadUrl();
- Preconditions.checkNotNull(downloadUrl, "Upload segment metadata has no
download url");
- downloadSegmentFromDeepStore(segmentName, indexLoadingConfig,
downloadUrl);
- _logger
- .info("Downloaded, untarred and add segment {} of table {} from {}",
segmentName, tableConfig.getTableName(),
- downloadUrl);
+ // Local segment doesn't exist or cannot load, download a new copy
+ downloadAndReplaceSegment(segmentName, segmentZKMetadata,
indexLoadingConfig, tableConfig);
return;
+ } else {
+ // Metadata has not been committed, delete the local segment if exists
+ FileUtils.deleteQuietly(segmentDir);
}
- // Start a new consuming segment or download the segment from the
controller
-
+ // Start a new consuming segment
if (!isValid(schema, tableConfig.getIndexingConfig())) {
_logger.error("Not adding segment {}", segmentName);
throw new RuntimeException("Mismatching schema/table config for " +
_tableNameWithType);
}
VirtualColumnProviderFactory.addBuiltInVirtualColumnsToSegmentSchema(schema,
segmentName);
- if (isLLCSegment) {
- if (segmentZKMetadata.getStatus() == Status.DONE) {
- downloadAndReplaceSegment(segmentName, segmentZKMetadata,
indexLoadingConfig, tableConfig);
- return;
- }
-
+ if (!isHLCSegment) {
// Generates only one semaphore for every partitionGroupId
LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
int partitionGroupId = llcSegmentName.getPartitionGroupId();
Semaphore semaphore =
_partitionGroupIdToSemaphoreMap.computeIfAbsent(partitionGroupId, k -> new
Semaphore(1));
PartitionUpsertMetadataManager partitionUpsertMetadataManager =
- _tableUpsertMetadataManager != null ? _tableUpsertMetadataManager
- .getOrCreatePartitionManager(partitionGroupId) : null;
+ _tableUpsertMetadataManager != null ?
_tableUpsertMetadataManager.getOrCreatePartitionManager(
+ partitionGroupId) : null;
segmentDataManager =
new LLRealtimeSegmentDataManager(segmentZKMetadata, tableConfig,
this, _indexDir.getAbsolutePath(),
indexLoadingConfig, schema, llcSegmentName, semaphore,
_serverMetrics, partitionUpsertMetadataManager);
@@ -419,7 +410,7 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
@Override
protected boolean allowDownload(String segmentName, SegmentZKMetadata
zkMetadata) {
- // Only LLC immutable segment allows download.
+ // Cannot download HLC segment or consuming segment
if (SegmentName.isHighLevelConsumerSegmentName(segmentName) ||
zkMetadata.getStatus() == Status.IN_PROGRESS) {
return false;
}
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java
index c96d5d4791..76d8333672 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtimetoofflinesegments/RealtimeToOfflineSegmentsTaskGenerator.java
@@ -117,18 +117,18 @@ public class RealtimeToOfflineSegmentsTaskGenerator
extends BaseTaskGenerator {
continue;
}
- // Get all segment metadata for completed segments (DONE status).
+ // Get all segment metadata for completed segments (DONE/UPLOADED
status).
List<SegmentZKMetadata> completedSegmentsZKMetadata = new ArrayList<>();
- Map<Integer, String> partitionToLatestCompletedSegmentName = new
HashMap<>();
+ Map<Integer, String> partitionToLatestLLCSegmentName = new HashMap<>();
Set<Integer> allPartitions = new HashSet<>();
- getCompletedSegmentsInfo(realtimeTableName, completedSegmentsZKMetadata,
partitionToLatestCompletedSegmentName,
+ getCompletedSegmentsInfo(realtimeTableName, completedSegmentsZKMetadata,
partitionToLatestLLCSegmentName,
allPartitions);
if (completedSegmentsZKMetadata.isEmpty()) {
LOGGER.info("No realtime-completed segments found for table: {},
skipping task generation: {}",
realtimeTableName, taskType);
continue;
}
- allPartitions.removeAll(partitionToLatestCompletedSegmentName.keySet());
+ allPartitions.removeAll(partitionToLatestLLCSegmentName.keySet());
if (!allPartitions.isEmpty()) {
LOGGER.info(
"Partitions: {} have no completed segments. Table: {} is not ready
for {}. Skipping task generation.",
@@ -158,7 +158,7 @@ public class RealtimeToOfflineSegmentsTaskGenerator extends
BaseTaskGenerator {
// (exclusive)
List<String> segmentNames = new ArrayList<>();
List<String> downloadURLs = new ArrayList<>();
- Set<String> lastCompletedSegmentPerPartition = new
HashSet<>(partitionToLatestCompletedSegmentName.values());
+ Set<String> lastLLCSegmentPerPartition = new
HashSet<>(partitionToLatestLLCSegmentName.values());
boolean skipGenerate = false;
while (true) {
// Check that execution window is older than bufferTime
@@ -180,7 +180,7 @@ public class RealtimeToOfflineSegmentsTaskGenerator extends
BaseTaskGenerator {
// If last completed segment is being used, make sure that segment
crosses over end of window.
// In the absence of this check, CONSUMING segments could contain
some portion of the window. That data
// would be skipped forever.
- if (lastCompletedSegmentPerPartition.contains(segmentName) &&
segmentEndTimeMs < windowEndMs) {
+ if (lastLLCSegmentPerPartition.contains(segmentName) &&
segmentEndTimeMs < windowEndMs) {
LOGGER.info("Window data overflows into CONSUMING segments for
partition of segment: {}. Skipping task "
+ "generation: {}", segmentName, taskType);
skipGenerate = true;
@@ -243,41 +243,44 @@ public class RealtimeToOfflineSegmentsTaskGenerator
extends BaseTaskGenerator {
}
/**
- * Fetch completed (non-consuming) segment and partition information
+ * Fetch completed (DONE/UPLOADED) segment and partition information
+ *
* @param realtimeTableName the realtime table name
- * @param completedSegmentsZKMetadata list for collecting the completed
segments ZK metadata
- * @param partitionToLatestCompletedSegmentName map for collecting the
partitionId to the latest completed segment
- * name
+ * @param completedSegmentsZKMetadata list for collecting the completed
(DONE/UPLOADED) segments ZK metadata
+ * @param partitionToLatestLLCSegmentName map for collecting the partitionId
to the latest LLC segment name
* @param allPartitions set for collecting all partition ids
*/
private void getCompletedSegmentsInfo(String realtimeTableName,
List<SegmentZKMetadata> completedSegmentsZKMetadata,
- Map<Integer, String> partitionToLatestCompletedSegmentName, Set<Integer>
allPartitions) {
+ Map<Integer, String> partitionToLatestLLCSegmentName, Set<Integer>
allPartitions) {
List<SegmentZKMetadata> segmentsZKMetadata =
_clusterInfoAccessor.getSegmentsZKMetadata(realtimeTableName);
Map<Integer, LLCSegmentName> latestLLCSegmentNameMap = new HashMap<>();
for (SegmentZKMetadata segmentZKMetadata : segmentsZKMetadata) {
- LLCSegmentName llcSegmentName = new
LLCSegmentName(segmentZKMetadata.getSegmentName());
- allPartitions.add(llcSegmentName.getPartitionGroupId());
-
- if (segmentZKMetadata.getStatus().equals(Segment.Realtime.Status.DONE)) {
+ Segment.Realtime.Status status = segmentZKMetadata.getStatus();
+ if (status.isCompleted()) {
completedSegmentsZKMetadata.add(segmentZKMetadata);
- latestLLCSegmentNameMap.compute(llcSegmentName.getPartitionGroupId(),
- (partitionGroupId, latestLLCSegmentName) -> {
- if (latestLLCSegmentName == null) {
- return llcSegmentName;
- } else {
- if (llcSegmentName.getSequenceNumber() >
latestLLCSegmentName.getSequenceNumber()) {
- return llcSegmentName;
- } else {
- return latestLLCSegmentName;
- }
- }
- });
+ }
+
+ // Skip UPLOADED segments that don't conform to the LLC segment name
+ LLCSegmentName llcSegmentName =
LLCSegmentName.of(segmentZKMetadata.getSegmentName());
+ if (llcSegmentName != null) {
+ int partitionId = llcSegmentName.getPartitionGroupId();
+ allPartitions.add(partitionId);
+ if (status.isCompleted()) {
+ latestLLCSegmentNameMap.compute(partitionId, (k,
latestLLCSegmentName) -> {
+ if (latestLLCSegmentName == null
+ || llcSegmentName.getSequenceNumber() >
latestLLCSegmentName.getSequenceNumber()) {
+ return llcSegmentName;
+ } else {
+ return latestLLCSegmentName;
+ }
+ });
+ }
}
}
for (Map.Entry<Integer, LLCSegmentName> entry :
latestLLCSegmentNameMap.entrySet()) {
- partitionToLatestCompletedSegmentName.put(entry.getKey(),
entry.getValue().getSegmentName());
+ partitionToLatestLLCSegmentName.put(entry.getKey(),
entry.getValue().getSegmentName());
}
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/readerwriter/RealtimeIndexOffHeapMemoryManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/readerwriter/RealtimeIndexOffHeapMemoryManager.java
index 12614e2718..861fa953a1 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/readerwriter/RealtimeIndexOffHeapMemoryManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/io/readerwriter/RealtimeIndexOffHeapMemoryManager.java
@@ -51,8 +51,8 @@ public abstract class RealtimeIndexOffHeapMemoryManager
implements PinotDataBuff
protected RealtimeIndexOffHeapMemoryManager(ServerMetrics serverMetrics,
String segmentName) {
_serverMetrics = serverMetrics;
_segmentName = segmentName;
- if (SegmentName.isLowLevelConsumerSegmentName(segmentName)) {
- LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
+ LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentName);
+ if (llcSegmentName != null) {
_tableName = llcSegmentName.getTableName();
} else if (SegmentName.isHighLevelConsumerSegmentName(segmentName)) {
HLCSegmentName hlcSegmentName = new HLCSegmentName(segmentName);
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index be0eeb3e00..0b6426f92a 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -544,11 +544,16 @@ public class CommonConstants {
public static class Segment {
public static class Realtime {
public enum Status {
- // Means the segment is in CONSUMING state.
- IN_PROGRESS, // Means the segment is in ONLINE state (segment
completed consuming and has been saved in
- // segment store).
- DONE, // Means the segment is uploaded to a Pinot controller by an
external party.
- UPLOADED
+ IN_PROGRESS, // The segment is still consuming data
+ DONE, // The segment has finished consumption and has been committed
to the segment store
+ UPLOADED; // The segment is uploaded by an external party
+
+ /**
+ * Returns {@code true} if the segment is completed (DONE/UPLOADED),
{@code false} otherwise.
+ */
+ public boolean isCompleted() {
+ return this != IN_PROGRESS;
+ }
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]