Jackie-Jiang commented on code in PR #8653:
URL: https://github.com/apache/pinot/pull/8653#discussion_r869672337
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ZKMetadataUtils.java:
##########
@@ -20,58 +20,106 @@
import java.util.HashMap;
import java.util.Map;
+import javax.annotation.Nullable;
import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
-import
org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier;
import org.apache.pinot.segment.spi.ColumnMetadata;
import org.apache.pinot.segment.spi.SegmentMetadata;
-import org.apache.pinot.segment.spi.creator.SegmentVersion;
-import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.segment.spi.partition.PartitionFunction;
import org.apache.pinot.segment.spi.partition.metadata.ColumnPartitionMetadata;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
public class ZKMetadataUtils {
private ZKMetadataUtils() {
}
- public static void updateSegmentMetadata(SegmentZKMetadata
segmentZKMetadata, SegmentMetadata segmentMetadata) {
- SegmentVersion segmentVersion = segmentMetadata.getVersion();
- if (segmentVersion != null) {
- segmentZKMetadata.setIndexVersion(segmentVersion.name());
- }
+ /**
+ * Creates the segment ZK metadata for a new segment.
+ */
+ public static SegmentZKMetadata createSegmentZKMetadata(String
tableNameWithType, SegmentMetadata segmentMetadata,
+ String downloadUrl, @Nullable String crypter, long segmentSizeInBytes) {
+ SegmentZKMetadata segmentZKMetadata = new
SegmentZKMetadata(segmentMetadata.getName());
+ updateSegmentZKMetadata(tableNameWithType, segmentZKMetadata,
segmentMetadata, downloadUrl, crypter,
+ segmentSizeInBytes);
+ segmentZKMetadata.setPushTime(System.currentTimeMillis());
+ return segmentZKMetadata;
+ }
+
+ /**
+ * Refreshes the segment ZK metadata for a segment being replaced.
+ */
+ public static void refreshSegmentZKMetadata(String tableNameWithType,
SegmentZKMetadata segmentZKMetadata,
+ SegmentMetadata segmentMetadata, String downloadUrl, @Nullable String
crypter, long segmentSizeInBytes) {
+ updateSegmentZKMetadata(tableNameWithType, segmentZKMetadata,
segmentMetadata, downloadUrl, crypter,
+ segmentSizeInBytes);
+ segmentZKMetadata.setRefreshTime(System.currentTimeMillis());
+ }
+
+ private static void updateSegmentZKMetadata(String tableNameWithType,
SegmentZKMetadata segmentZKMetadata,
+ SegmentMetadata segmentMetadata, String downloadUrl, @Nullable String
crypter, long segmentSizeInBytes) {
if (segmentMetadata.getTimeInterval() != null) {
segmentZKMetadata.setStartTime(segmentMetadata.getStartTime());
segmentZKMetadata.setEndTime(segmentMetadata.getEndTime());
segmentZKMetadata.setTimeUnit(segmentMetadata.getTimeUnit());
+ } else {
+ segmentZKMetadata.setStartTime(-1);
+ segmentZKMetadata.setEndTime(-1);
+ segmentZKMetadata.setTimeUnit(null);
+ }
+ if (segmentMetadata.getVersion() != null) {
+ segmentZKMetadata.setIndexVersion(segmentMetadata.getVersion().name());
+ } else {
+ segmentZKMetadata.setIndexVersion(null);
}
segmentZKMetadata.setTotalDocs(segmentMetadata.getTotalDocs());
- segmentZKMetadata.setCreationTime(segmentMetadata.getIndexCreationTime());
+ segmentZKMetadata.setSizeInBytes(segmentSizeInBytes);
segmentZKMetadata.setCrc(Long.parseLong(segmentMetadata.getCrc()));
- SegmentZKMetadataCustomMapModifier segmentZKMetadataCustomMapModifier =
- new
SegmentZKMetadataCustomMapModifier(SegmentZKMetadataCustomMapModifier.ModifyMode.UPDATE,
- segmentZKMetadata.getCustomMap());
-
segmentZKMetadata.setCustomMap(segmentZKMetadataCustomMapModifier.modifyMap(segmentMetadata.getCustomMap()));
+ segmentZKMetadata.setCreationTime(segmentMetadata.getIndexCreationTime());
+ segmentZKMetadata.setDownloadUrl(downloadUrl);
+ segmentZKMetadata.setCrypterName(crypter);
- // Extract column partition metadata (if any), and set it into segment ZK
metadata.
+ // Set partition metadata
Map<String, ColumnPartitionMetadata> columnPartitionMap = new HashMap<>();
- if (segmentMetadata instanceof SegmentMetadataImpl) {
- for (Map.Entry<String, ColumnMetadata> entry :
segmentMetadata.getColumnMetadataMap().entrySet()) {
- String column = entry.getKey();
- ColumnMetadata columnMetadata = entry.getValue();
- PartitionFunction partitionFunction =
columnMetadata.getPartitionFunction();
-
- if (partitionFunction != null) {
- ColumnPartitionMetadata columnPartitionMetadata =
- new ColumnPartitionMetadata(partitionFunction.getName(),
partitionFunction.getNumPartitions(),
- columnMetadata.getPartitions(),
partitionFunction.getFunctionConfig());
- columnPartitionMap.put(column, columnPartitionMetadata);
- }
+ for (Map.Entry<String, ColumnMetadata> entry :
segmentMetadata.getColumnMetadataMap().entrySet()) {
+ String column = entry.getKey();
+ ColumnMetadata columnMetadata = entry.getValue();
+ PartitionFunction partitionFunction =
columnMetadata.getPartitionFunction();
+ if (partitionFunction != null) {
+ ColumnPartitionMetadata columnPartitionMetadata =
+ new ColumnPartitionMetadata(partitionFunction.getName(),
partitionFunction.getNumPartitions(),
+ columnMetadata.getPartitions(),
partitionFunction.getFunctionConfig());
+ columnPartitionMap.put(column, columnPartitionMetadata);
}
}
-
if (!columnPartitionMap.isEmpty()) {
segmentZKMetadata.setPartitionMetadata(new
SegmentPartitionMetadata(columnPartitionMap));
+ } else {
+ segmentZKMetadata.setPartitionMetadata(null);
+ }
+
+ // Update custom metadata
+ // NOTE: Do not remove existing keys because they can be set by the HTTP
header from the segment upload request
+ Map<String, String> customMap = segmentZKMetadata.getCustomMap();
+ if (customMap == null) {
+ customMap = segmentMetadata.getCustomMap();
+ } else {
+ customMap.putAll(segmentMetadata.getCustomMap());
+ }
+ segmentZKMetadata.setCustomMap(customMap);
+
+ // Set fields specific to realtime table
+ if (TableNameBuilder.isRealtimeTableResource(tableNameWithType)) {
+
segmentZKMetadata.setStatus(CommonConstants.Segment.Realtime.Status.UPLOADED);
+
+ // NOTE: Keep offset as is if it is not explicitly set in the segment
metadata
+ if (segmentMetadata.getStartOffset() != null) {
+ segmentZKMetadata.setStartOffset(segmentMetadata.getStartOffset());
+ }
+ if (segmentMetadata.getEndOffset() != null) {
+ segmentZKMetadata.setEndOffset(segmentMetadata.getEndOffset());
+ }
Review Comment:
This logic is for the refresh segment case. If user tries to refresh a LLC
segment with a generated segment (e.g. segment purge), we want to keep the
start/end offset unchanged. Another way is to ensure the generated segment have
the proper start/end offset set, but that adds extra dependency on the segment
creation logic and make it prune to bugs. I don't anticipate a scenario where
user wants to clear the start/end offset.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]