klsince commented on code in PR #13107: URL: https://github.com/apache/pinot/pull/13107#discussion_r1625083242
########## pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/name/UploadedRealtimeSegmentNameGenerator.java: ########## @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.spi.creator.name; + +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import javax.annotation.Nullable; + + +/** + * Implementation for generating segment names of the format UploadedRealtimeSegmentName: + * uploaded__{tableName}__{partitionId}__{sequenceId}__{creationTime}__{optionalSuffix} + * + * <p>This naming convention is adopted to represent uploaded segments to a realtime table. The semantic is similar + * to LLCSegmentName. Scenarios where this naming convention can be preferred is: + * <li> Generating segments from a batch workload + * <li> Minion based segment transformations + */ +public class UploadedRealtimeSegmentNameGenerator implements SegmentNameGenerator { + + private static final String SEGMENT_NAME_PREFIX = "uploaded"; + private static final String DELIMITER = "__"; + private final String _tableName; + private final int _partitionId; + // creation time must be in long and milliseconds since epoch to be consistent with creation.meta time for valid + // comparison in segment replace flow. + private final long _creationTimeMillis; + @Nullable + private final String _suffix; + + public UploadedRealtimeSegmentNameGenerator(String tableName, int partitionId, long creationTimeMillis, + String suffix) { + Preconditions.checkState(creationTimeMillis > 0, "Creation time must be positive"); Review Comment: nit: use checkArgument() as those are params passed in and also check on the partitionid value to be >=0? ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java: ########## @@ -158,6 +158,52 @@ protected void addOrReplaceSegment(ImmutableSegmentImpl segment, ThreadSafeMutab } } + /** + * <li> When the replacing segment and current segment are of {@link LLCSegmentName} then the PK should resolve to + * row in segment with higher sequence id. + * <li> When the replacing segment and current segment are of {@link UploadedRealtimeSegmentName} then the PK + * should resolve to row in segment with higher sequence id, creation time. + * <li> When either is of type {@link UploadedRealtimeSegmentName} then resolve on creation time, if same(rare + * scenario) then give preference to uploaded time + * + * @param segmentName replacing segment name + * @param currentSegmentName current segment name having the record for the given primary key + * @param segmentCreationTimeMs replacing segment creation time + * @param currentSegmentCreationTimeMs current segment creation time + * @return true if the record in replacing segment should replace the record in current segment + */ + private boolean shouldReplaceOnComparisonTie(String segmentName, String currentSegmentName, + long segmentCreationTimeMs, long currentSegmentCreationTimeMs) { + + LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentName); + LLCSegmentName currentLLCSegmentName = LLCSegmentName.of(currentSegmentName); + if (llcSegmentName != null && currentLLCSegmentName != null) { + return llcSegmentName.getSequenceNumber() > currentLLCSegmentName.getSequenceNumber(); + } + + UploadedRealtimeSegmentName uploadedSegmentName = UploadedRealtimeSegmentName.of(segmentName); + UploadedRealtimeSegmentName currentUploadedSegmentName = UploadedRealtimeSegmentName.of(currentSegmentName); + + if (uploadedSegmentName != null && currentUploadedSegmentName != null) { + int comparisonResult = + Integer.compare(uploadedSegmentName.getSequenceId(), currentUploadedSegmentName.getSequenceId()); + if (comparisonResult == 0) { + Long.compare(segmentCreationTimeMs, currentSegmentCreationTimeMs); Review Comment: missed `return `? btw, can inline into `return res == 0? Long.compare(segmentCreationTimeMs, currentSegmentCreationTimeMs) : res > 0` ########## pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java: ########## @@ -378,6 +379,115 @@ public void testAddReplaceRemoveSegmentWithRecordDelete() verifyAddReplaceRemoveSegmentWithRecordDelete(HashFunction.MURMUR3, true); } + @Test + public void verifyAddReplaceUploadedSegment() + throws IOException { + ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager = + new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, + _contextBuilder.setHashFunction(HashFunction.NONE).build()); + Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap; + Set<IndexSegment> trackedSegments = upsertMetadataManager._trackedSegments; + + // Add the first segment + int numRecords = 6; + int[] primaryKeys = new int[]{0, 1, 2, 0, 1, 0}; + int[] timestamps = new int[]{100, 100, 100, 80, 120, 100}; + ThreadSafeMutableRoaringBitmap validDocIds1 = new ThreadSafeMutableRoaringBitmap(); + List<PrimaryKey> primaryKeys1 = getPrimaryKeyList(numRecords, primaryKeys); + SegmentMetadataImpl segmentMetadata = mock(SegmentMetadataImpl.class); + when(segmentMetadata.getIndexCreationTime()).thenReturn(1000L); + ImmutableSegmentImpl segment1 = + mockImmutableSegmentWithSegmentMetadata(1, validDocIds1, null, primaryKeys1, segmentMetadata, null); + List<RecordInfo> recordInfoList1; + // get recordInfo by iterating all records. + recordInfoList1 = getRecordInfoList(numRecords, primaryKeys, timestamps, null); + upsertMetadataManager.addSegment(segment1, validDocIds1, null, recordInfoList1.iterator()); + trackedSegments.add(segment1); + // segment1: 0 -> {5, 100}, 1 -> {4, 120}, 2 -> {2, 100} + assertEquals(recordLocationMap.size(), 3); + checkRecordLocation(recordLocationMap, 0, segment1, 5, 100, HashFunction.NONE); + checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, HashFunction.NONE); + checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, HashFunction.NONE); + assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{2, 4, 5}); + + // Add the second segment of uploaded name format with higher creation time + numRecords = 5; + primaryKeys = new int[]{0, 1, 2, 3, 0}; + timestamps = new int[]{100, 100, 120, 80, 80}; + ThreadSafeMutableRoaringBitmap validDocIds2 = new ThreadSafeMutableRoaringBitmap(); + ImmutableSegmentImpl uploadedSegment2 = + mockUploadedImmutableSegment(2, validDocIds2, null, getPrimaryKeyList(numRecords, primaryKeys), 1010L); + List<RecordInfo> recordInfoList2; + // get recordInfo by iterating all records. + recordInfoList2 = getRecordInfoList(numRecords, primaryKeys, timestamps, null); + upsertMetadataManager.addSegment(uploadedSegment2, validDocIds2, null, recordInfoList2.iterator()); + trackedSegments.add(uploadedSegment2); + + // segment1: 1 -> {4, 120} + // uploadedSegment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80} + assertEquals(recordLocationMap.size(), 4); + checkRecordLocation(recordLocationMap, 0, uploadedSegment2, 0, 100, HashFunction.NONE); + checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, HashFunction.NONE); + checkRecordLocation(recordLocationMap, 2, uploadedSegment2, 2, 120, HashFunction.NONE); + checkRecordLocation(recordLocationMap, 3, uploadedSegment2, 3, 80, HashFunction.NONE); + assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4}); + assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 2, 3}); + + // replace uploadedSegment2 with higher creation time + ThreadSafeMutableRoaringBitmap newValidDocIds2 = new ThreadSafeMutableRoaringBitmap(); + ImmutableSegmentImpl newUploadedSegment2 = + mockUploadedImmutableSegment(2, newValidDocIds2, null, getPrimaryKeyList(numRecords, primaryKeys), 1020L); + upsertMetadataManager.replaceSegment(newUploadedSegment2, newValidDocIds2, null, recordInfoList2.iterator(), + uploadedSegment2); + trackedSegments.add(newUploadedSegment2); + trackedSegments.remove(uploadedSegment2); + + // segment1: 1 -> {4, 120} + // newUploadedSegment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80} + assertEquals(recordLocationMap.size(), 4); + checkRecordLocation(recordLocationMap, 0, newUploadedSegment2, 0, 100, HashFunction.NONE); + checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, HashFunction.NONE); + checkRecordLocation(recordLocationMap, 2, newUploadedSegment2, 2, 120, HashFunction.NONE); + checkRecordLocation(recordLocationMap, 3, newUploadedSegment2, 3, 80, HashFunction.NONE); + assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{4}); + assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new int[]{0, 2, 3}); + + // add uploaded segment3 with same creation time as segment1 and higher sequence id than newUploadedSegment2 + numRecords = 2; + primaryKeys = new int[]{0, 1}; + timestamps = new int[]{100, 120}; + ThreadSafeMutableRoaringBitmap validDocIds3 = new ThreadSafeMutableRoaringBitmap(); + ImmutableSegmentImpl uploadedSegment3 = + mockUploadedImmutableSegment(3, validDocIds3, null, getPrimaryKeyList(numRecords, primaryKeys), 1000L); + List<RecordInfo> recordInfoList3; + // get recordInfo by iterating all records. + recordInfoList3 = getRecordInfoList(numRecords, primaryKeys, timestamps, null); + upsertMetadataManager.addSegment(uploadedSegment3, validDocIds3, null, recordInfoList3.iterator()); + trackedSegments.add(uploadedSegment3); + + // newUploadedSegment2: 2 -> {2, 120}, 3 -> {3, 80} + // segment3: 0 -> {0, 100}, 1 -> {1, 120} Review Comment: For `0 -> {0, 100}` segment3 won over newUploadedSegment2? But segment3 had smaller ctime (1000) So I wonder that we probably want to compare ctime then seqId for uploadedRealtimeSegment, as seqId is used to break tie (or avoid name conflicts) among uploadedRealtTimesegments with same creation times, and it's not quite comparable for uploaded segments with different ctimes -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
