klsince commented on code in PR #13107:
URL: https://github.com/apache/pinot/pull/13107#discussion_r1625083242


##########
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/name/UploadedRealtimeSegmentNameGenerator.java:
##########
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.creator.name;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import javax.annotation.Nullable;
+
+
+/**
+ * Implementation for generating segment names of the format 
UploadedRealtimeSegmentName:
+ * 
uploaded__{tableName}__{partitionId}__{sequenceId}__{creationTime}__{optionalSuffix}
+ *
+ * <p>This naming convention is adopted to represent uploaded segments to a 
realtime table. The semantic is similar
+ * to LLCSegmentName. Scenarios where this naming convention can be preferred 
is:
+ * <li> Generating segments from a batch workload
+ * <li> Minion based segment transformations
+ */
+public class UploadedRealtimeSegmentNameGenerator implements 
SegmentNameGenerator {
+
+  private static final String SEGMENT_NAME_PREFIX = "uploaded";
+  private static final String DELIMITER = "__";
+  private final String _tableName;
+  private final int _partitionId;
+  // creation time must be in long and milliseconds since epoch to be 
consistent with creation.meta time for valid
+  // comparison in segment replace flow.
+  private final long _creationTimeMillis;
+  @Nullable
+  private final String _suffix;
+
+  public UploadedRealtimeSegmentNameGenerator(String tableName, int 
partitionId, long creationTimeMillis,
+      String suffix) {
+    Preconditions.checkState(creationTimeMillis > 0, "Creation time must be 
positive");

Review Comment:
   nit: use checkArgument() as those are params passed in
   
   and also check on the partitionid value to be >=0?



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java:
##########
@@ -158,6 +158,52 @@ protected void addOrReplaceSegment(ImmutableSegmentImpl 
segment, ThreadSafeMutab
     }
   }
 
+  /**
+   * <li> When the replacing segment and current segment are of {@link 
LLCSegmentName} then the PK should resolve to
+   * row in segment with higher sequence id.
+   * <li> When the replacing segment and current segment are of {@link 
UploadedRealtimeSegmentName} then the PK
+   * should resolve to row in segment with higher sequence id, creation time.
+   * <li> When either is of type {@link UploadedRealtimeSegmentName} then 
resolve on creation time, if same(rare
+   * scenario) then give preference to uploaded time
+   *
+   * @param segmentName replacing segment name
+   * @param currentSegmentName current segment name having the record for the 
given primary key
+   * @param segmentCreationTimeMs replacing segment creation time
+   * @param currentSegmentCreationTimeMs current segment creation time
+   * @return true if the record in replacing segment should replace the record 
in current segment
+   */
+  private boolean shouldReplaceOnComparisonTie(String segmentName, String 
currentSegmentName,
+      long segmentCreationTimeMs, long currentSegmentCreationTimeMs) {
+
+    LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentName);
+    LLCSegmentName currentLLCSegmentName = 
LLCSegmentName.of(currentSegmentName);
+    if (llcSegmentName != null && currentLLCSegmentName != null) {
+      return llcSegmentName.getSequenceNumber() > 
currentLLCSegmentName.getSequenceNumber();
+    }
+
+    UploadedRealtimeSegmentName uploadedSegmentName = 
UploadedRealtimeSegmentName.of(segmentName);
+    UploadedRealtimeSegmentName currentUploadedSegmentName = 
UploadedRealtimeSegmentName.of(currentSegmentName);
+
+    if (uploadedSegmentName != null && currentUploadedSegmentName != null) {
+      int comparisonResult =
+          Integer.compare(uploadedSegmentName.getSequenceId(), 
currentUploadedSegmentName.getSequenceId());
+      if (comparisonResult == 0) {
+        Long.compare(segmentCreationTimeMs, currentSegmentCreationTimeMs);

Review Comment:
   missed `return `? 
   
   btw, can inline into `return res == 0? Long.compare(segmentCreationTimeMs, 
currentSegmentCreationTimeMs) : res > 0`



##########
pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java:
##########
@@ -378,6 +379,115 @@ public void testAddReplaceRemoveSegmentWithRecordDelete()
     verifyAddReplaceRemoveSegmentWithRecordDelete(HashFunction.MURMUR3, true);
   }
 
+  @Test
+  public void verifyAddReplaceUploadedSegment()
+      throws IOException {
+    ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+        new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0,
+            _contextBuilder.setHashFunction(HashFunction.NONE).build());
+    Map<Object, RecordLocation> recordLocationMap = 
upsertMetadataManager._primaryKeyToRecordLocationMap;
+    Set<IndexSegment> trackedSegments = upsertMetadataManager._trackedSegments;
+
+    // Add the first segment
+    int numRecords = 6;
+    int[] primaryKeys = new int[]{0, 1, 2, 0, 1, 0};
+    int[] timestamps = new int[]{100, 100, 100, 80, 120, 100};
+    ThreadSafeMutableRoaringBitmap validDocIds1 = new 
ThreadSafeMutableRoaringBitmap();
+    List<PrimaryKey> primaryKeys1 = getPrimaryKeyList(numRecords, primaryKeys);
+    SegmentMetadataImpl segmentMetadata = mock(SegmentMetadataImpl.class);
+    when(segmentMetadata.getIndexCreationTime()).thenReturn(1000L);
+    ImmutableSegmentImpl segment1 =
+        mockImmutableSegmentWithSegmentMetadata(1, validDocIds1, null, 
primaryKeys1, segmentMetadata, null);
+    List<RecordInfo> recordInfoList1;
+    // get recordInfo by iterating all records.
+    recordInfoList1 = getRecordInfoList(numRecords, primaryKeys, timestamps, 
null);
+    upsertMetadataManager.addSegment(segment1, validDocIds1, null, 
recordInfoList1.iterator());
+    trackedSegments.add(segment1);
+    // segment1: 0 -> {5, 100}, 1 -> {4, 120}, 2 -> {2, 100}
+    assertEquals(recordLocationMap.size(), 3);
+    checkRecordLocation(recordLocationMap, 0, segment1, 5, 100, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 2, segment1, 2, 100, 
HashFunction.NONE);
+    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{2, 4, 5});
+
+    // Add the second segment of uploaded name format with higher creation time
+    numRecords = 5;
+    primaryKeys = new int[]{0, 1, 2, 3, 0};
+    timestamps = new int[]{100, 100, 120, 80, 80};
+    ThreadSafeMutableRoaringBitmap validDocIds2 = new 
ThreadSafeMutableRoaringBitmap();
+    ImmutableSegmentImpl uploadedSegment2 =
+        mockUploadedImmutableSegment(2, validDocIds2, null, 
getPrimaryKeyList(numRecords, primaryKeys), 1010L);
+    List<RecordInfo> recordInfoList2;
+    // get recordInfo by iterating all records.
+    recordInfoList2 = getRecordInfoList(numRecords, primaryKeys, timestamps, 
null);
+    upsertMetadataManager.addSegment(uploadedSegment2, validDocIds2, null, 
recordInfoList2.iterator());
+    trackedSegments.add(uploadedSegment2);
+
+    // segment1: 1 -> {4, 120}
+    // uploadedSegment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
+    assertEquals(recordLocationMap.size(), 4);
+    checkRecordLocation(recordLocationMap, 0, uploadedSegment2, 0, 100, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 2, uploadedSegment2, 2, 120, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 3, uploadedSegment2, 3, 80, 
HashFunction.NONE);
+    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{4});
+    assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0, 2, 3});
+
+    // replace uploadedSegment2 with higher creation time
+    ThreadSafeMutableRoaringBitmap newValidDocIds2 = new 
ThreadSafeMutableRoaringBitmap();
+    ImmutableSegmentImpl newUploadedSegment2 =
+        mockUploadedImmutableSegment(2, newValidDocIds2, null, 
getPrimaryKeyList(numRecords, primaryKeys), 1020L);
+    upsertMetadataManager.replaceSegment(newUploadedSegment2, newValidDocIds2, 
null, recordInfoList2.iterator(),
+        uploadedSegment2);
+    trackedSegments.add(newUploadedSegment2);
+    trackedSegments.remove(uploadedSegment2);
+
+    // segment1: 1 -> {4, 120}
+    // newUploadedSegment2: 0 -> {0, 100}, 2 -> {2, 120}, 3 -> {3, 80}
+    assertEquals(recordLocationMap.size(), 4);
+    checkRecordLocation(recordLocationMap, 0, newUploadedSegment2, 0, 100, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 1, segment1, 4, 120, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 2, newUploadedSegment2, 2, 120, 
HashFunction.NONE);
+    checkRecordLocation(recordLocationMap, 3, newUploadedSegment2, 3, 80, 
HashFunction.NONE);
+    assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{4});
+    assertEquals(validDocIds2.getMutableRoaringBitmap().toArray(), new 
int[]{0, 2, 3});
+
+    // add uploaded segment3 with same creation time as segment1 and higher 
sequence id than newUploadedSegment2
+    numRecords = 2;
+    primaryKeys = new int[]{0, 1};
+    timestamps = new int[]{100, 120};
+    ThreadSafeMutableRoaringBitmap validDocIds3 = new 
ThreadSafeMutableRoaringBitmap();
+    ImmutableSegmentImpl uploadedSegment3 =
+        mockUploadedImmutableSegment(3, validDocIds3, null, 
getPrimaryKeyList(numRecords, primaryKeys), 1000L);
+    List<RecordInfo> recordInfoList3;
+    // get recordInfo by iterating all records.
+    recordInfoList3 = getRecordInfoList(numRecords, primaryKeys, timestamps, 
null);
+    upsertMetadataManager.addSegment(uploadedSegment3, validDocIds3, null, 
recordInfoList3.iterator());
+    trackedSegments.add(uploadedSegment3);
+
+    // newUploadedSegment2: 2 -> {2, 120}, 3 -> {3, 80}
+    // segment3: 0 -> {0, 100}, 1 -> {1, 120}

Review Comment:
   For `0 -> {0, 100}` segment3 won over newUploadedSegment2? But segment3 had 
smaller ctime (1000) 
   
   So I wonder that we probably want to compare ctime then seqId for 
uploadedRealtimeSegment, as seqId is used to break tie (or avoid name 
conflicts) among uploadedRealtTimesegments with same creation times, and it's 
not quite comparable for uploaded segments with different ctimes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to